Compare commits

...

11 commits

Author SHA1 Message Date
6182d47b43 [#81] remove schema from preset_s3 and k6 load s3 scenarios 2023-07-14 11:36:10 +00:00
ff6814e15d [#72] Add option --prepare-locally
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-07 13:16:54 +03:00
56235f5e90 [#72] Update dependencies
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-06 12:14:52 +03:00
f633f9a64a [#79] client: Remove bufSize field
Use constant value instead.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
42f1881580 [#79] object put: Add chunk size parameter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
4972bb928e [#79] xk6: Update node and SDK-Go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-05 15:37:06 +03:00
a1f5738d2f [#77] Use writecache in local scenarios
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-06-30 12:50:42 +00:00
8e99d08aa4 [#12] Allow using multiple endpoints for presets
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-06-28 20:21:43 +03:00
ba04c682cb [#13] Allow to use english text in the payload
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
3525d5b4e3 [#15] go.mod: Tidy
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
62d7b78131 [#73] preset: Allow to sleep before putting objects
For large networks block propagation may take some time.
If we do not wait enough, putting objects can fail for some containers.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-25 13:14:15 +03:00
23 changed files with 496 additions and 1887 deletions

View file

@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
- hex encoded private key (empty value produces random key) - hex encoded private key (empty value produces random key)
- dial timeout in seconds (0 for the default value) - dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value) - stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too)
```js ```js
import native from 'k6/x/frostfs/native'; import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
``` ```
### Methods ### Methods
@ -98,13 +99,13 @@ Credentials are taken from default AWS configuration files and ENVs.
```js ```js
import s3 from 'k6/x/frostfs/s3'; import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080") const s3_cli = s3.connect("https://s3.frostfs.devenv:8080")
``` ```
You can also provide additional options: You can also provide additional options:
```js ```js
import s3 from 'k6/x/frostfs/s3'; import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'}) const s3_cli = s3.connect("https://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
``` ```
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates) * `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)

View file

@ -3,7 +3,7 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
export const options = { export const options = {
stages: [ stages: [

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
const frostfs_obj = frostfs_cli.onsite(container, payload) const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = { export const options = {

125
go.mod
View file

@ -3,47 +3,50 @@ module git.frostfs.info/TrueCloudLab/xk6-frostfs
go 1.19 go 1.19
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230522084814-731bf5d0ee66 git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230704155826-b520a3049e6f
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.24.1-0.20230403110435-01afa1cae425 git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230519144724-f5b23eb22569 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/aws/aws-sdk-go-v2 v1.16.3 github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.15.5 github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9 github.com/aws/aws-sdk-go-v2/service/s3 v1.36.0
github.com/dop251/goja v0.0.0-20230427124612-428fc442ff5f github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
github.com/go-loremipsum/loremipsum v1.1.3
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/nspcc-dev/neo-go v0.101.1 github.com/nspcc-dev/neo-go v0.101.2
github.com/panjf2000/ants/v2 v2.5.0 github.com/panjf2000/ants/v2 v2.8.0
github.com/sirupsen/logrus v1.9.2 github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.3 github.com/stretchr/testify v1.8.4
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.7
go.k6.io/k6 v0.44.2-0.20230524054758-add1a5fe5019 go.k6.io/k6 v0.45.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/sys v0.8.0 golang.org/x/sys v0.10.0
) )
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230519114017-0c67b8fefa41 // indirect git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142716-68021b910acb // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.0 // indirect git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.44.6 // indirect github.com/aws/aws-sdk-go v1.44.296 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.12.0 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.26 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.11 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.4 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/smithy-go v1.11.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect github.com/bluele/gcache v0.0.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect
@ -57,65 +60,67 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 // indirect github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect github.com/klauspost/compress v1.16.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/sio v0.3.0 // indirect github.com/minio/sio v0.3.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nats.go v1.25.0 // indirect github.com/nats-io/nats.go v1.27.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.20.2 // indirect github.com/onsi/gomega v1.20.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.9.5 // indirect github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.15.0 // indirect github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect
go.opentelemetry.io/otel v1.15.1 // indirect github.com/twmb/murmur3 v1.1.8 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.15.1 // indirect go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.15.1 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.15.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.15.1 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.15.1 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.20.0 // indirect
go.uber.org/atomic v1.11.0 // indirect go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.10.0 // indirect golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.2.0 // indirect golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.9.0 // indirect golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/grpc v1.55.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/protobuf v1.30.0 // indirect google.golang.org/grpc v1.56.1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/guregu/null.v3 v3.5.0 // indirect gopkg.in/guregu/null.v3 v3.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect

1801
go.sum

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,8 @@
package datagen package datagen
import ( import (
"strings"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
) )
@ -36,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
return modules.Exports{Default: d} return modules.Exports{Default: d}
} }
func (d *Datagen) Generator(size int) *Generator { func (d *Datagen) Generator(size int, typ string) *Generator {
g := NewGenerator(d.vu, size) g := NewGenerator(d.vu, size, strings.ToLower(typ))
return &g return &g
} }

View file

@ -1,12 +1,14 @@
package datagen package datagen
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"math/rand" "math/rand"
"time" "time"
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/go-loremipsum/loremipsum"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
) )
@ -24,6 +26,7 @@ type (
size int size int
rand *rand.Rand rand *rand.Rand
buf []byte buf []byte
typ string
offset int offset int
} }
@ -36,19 +39,53 @@ type (
// TailSize specifies number of extra random bytes in the buffer tail. // TailSize specifies number of extra random bytes in the buffer tail.
const TailSize = 1024 const TailSize = 1024
func NewGenerator(vu modules.VU, size int) Generator { var payloadTypes = []string{
"text",
"random",
"",
}
func NewGenerator(vu modules.VU, size int, typ string) Generator {
if size <= 0 { if size <= 0 {
panic("size should be positive") panic("size should be positive")
} }
var found bool
for i := range payloadTypes {
if payloadTypes[i] == typ {
found = true
break
}
}
if !found {
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
}
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, size+TailSize) buf := make([]byte, size+TailSize)
r.Read(buf) g := Generator{
return Generator{
vu: vu, vu: vu,
size: size, size: size,
rand: r, rand: r,
buf: buf, buf: buf,
typ: typ,
}
g.fillBuffer()
return g
}
func (g *Generator) fillBuffer() {
switch g.typ {
case "text":
li := loremipsum.New()
b := bytes.NewBuffer(g.buf[:0])
for b.Len() < g.size+TailSize {
b.WriteString(li.Paragraph())
b.WriteRune('\n')
}
g.buf = b.Bytes()
default:
rand.Read(g.buf) // Per docs, err is always nil here
} }
} }
@ -66,9 +103,9 @@ func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
} }
func (g *Generator) nextSlice() []byte { func (g *Generator) nextSlice() []byte {
if g.offset >= TailSize { if g.offset+g.size >= len(g.buf) {
g.offset = 0 g.offset = 0
g.rand.Read(g.buf) // Per docs, err is always nil here g.fillBuffer()
} }
result := g.buf[g.offset : g.offset+g.size] result := g.buf[g.offset : g.offset+g.size]

View file

@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
t.Run("fails on negative size", func(t *testing.T) { t.Run("fails on negative size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, -1) _ = NewGenerator(vu, -1, "")
}) })
}) })
t.Run("fails on zero size", func(t *testing.T) { t.Run("fails on zero size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, 0) _ = NewGenerator(vu, 0, "")
}) })
}) })
t.Run("creates slice of specified size", func(t *testing.T) { t.Run("creates slice of specified size", func(t *testing.T) {
size := 10 size := 10
g := NewGenerator(vu, size) g := NewGenerator(vu, size, "")
slice := g.nextSlice() slice := g.nextSlice()
require.Len(t, slice, size) require.Len(t, slice, size)
}) })
t.Run("creates a different slice on each call", func(t *testing.T) { t.Run("creates a different slice on each call", func(t *testing.T) {
g := NewGenerator(vu, 1000) g := NewGenerator(vu, 1000, "")
slice1 := g.nextSlice() slice1 := g.nextSlice()
slice2 := g.nextSlice() slice2 := g.nextSlice()
// Each slice should be unique (assuming that 1000 random bytes will never coincide // Each slice should be unique (assuming that 1000 random bytes will never coincide
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
}) })
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) { t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
g := NewGenerator(vu, 1000) g := NewGenerator(vu, 1000, "")
initialSlice := g.nextSlice() initialSlice := g.nextSlice()
for i := 0; i < TailSize; i++ { for i := 0; i < TailSize; i++ {
g.nextSlice() g.nextSlice()

View file

@ -292,17 +292,20 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti
// write cache // write cache
if wc := sc.WriteCache(); wc.Enabled() { if wc := sc.WriteCache(); wc.Enabled() {
opts = append(opts, shard.WithWriteCacheOptions( opts = append(opts,
writecache.WithPath(wc.Path()), shard.WithWriteCache(true),
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()), shard.WithWriteCacheOptions(
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()), writecache.WithPath(wc.Path()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()), writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
writecache.WithSmallObjectSize(wc.SmallObjectSize()), writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
writecache.WithFlushWorkersCount(wc.WorkersNumber()), writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithMaxCacheSize(wc.SizeLimit()), writecache.WithSmallObjectSize(wc.SmallObjectSize()),
writecache.WithNoSync(wc.NoSync()), writecache.WithFlushWorkersCount(wc.WorkersNumber()),
writecache.WithLogger(&logger.Logger{Logger: log}), writecache.WithMaxCacheSize(wc.SizeLimit()),
)) writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(&logger.Logger{Logger: log}),
),
)
} }
// tree // tree

View file

@ -31,11 +31,11 @@ import (
type ( type (
Client struct { Client struct {
vu modules.VU vu modules.VU
key ecdsa.PrivateKey key ecdsa.PrivateKey
tok session.Object tok session.Object
cli *client.Client cli *client.Client
bufsize int prepareLocally bool
} }
PutResponse struct { PutResponse struct {
@ -66,30 +66,18 @@ type (
} }
PreparedObject struct { PreparedObject struct {
vu modules.VU vu modules.VU
key ecdsa.PrivateKey key ecdsa.PrivateKey
cli *client.Client cli *client.Client
bufsize int hdr object.Object
payload []byte
hdr object.Object prepareLocally bool
payload []byte
} }
) )
const defaultBufferSize = 64 * 1024 const defaultBufferSize = 64 * 1024
func (c *Client) SetBufferSize(size int) { func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
if size < 0 {
panic("buffer size must be positive")
}
if size == 0 {
c.bufsize = defaultBufferSize
} else {
c.bufsize = size
}
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
cliContainerID := parseContainerID(containerID) cliContainerID := parseContainerID(containerID)
tok := c.tok tok := c.tok
@ -116,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner) o.SetOwnerID(&owner)
o.SetAttributes(attrs...) o.SetAttributes(attrs...)
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes()) resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -176,7 +164,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
prm.WithinSession(tok) prm.WithinSession(tok)
var objSize = 0 var objSize = 0
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
objSize += len(data) objSize += len(data)
}) })
if err != nil { if err != nil {
@ -194,10 +182,9 @@ func get(
cli *client.Client, cli *client.Client,
prm client.PrmObjectGet, prm client.PrmObjectGet,
ctx context.Context, ctx context.Context,
bufSize int,
onDataChunk func(chunk []byte), onDataChunk func(chunk []byte),
) error { ) error {
var buf = make([]byte, bufSize) var buf = make([]byte, defaultBufferSize)
objectReader, err := cli.ObjectGetInit(ctx, prm) objectReader, err := cli.ObjectGetInit(ctx, prm)
if err != nil { if err != nil {
@ -245,7 +232,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
prm.WithinSession(tok) prm.WithinSession(tok)
hasher := sha256.New() hasher := sha256.New()
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
hasher.Write(data) hasher.Write(data)
}) })
if err != nil { if err != nil {
@ -381,13 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
} }
return PreparedObject{ return PreparedObject{
vu: c.vu, vu: c.vu,
key: c.key, key: c.key,
cli: c.cli, cli: c.cli,
bufsize: c.bufsize, hdr: *obj,
payload: data,
hdr: *obj, prepareLocally: c.prepareLocally,
payload: data,
} }
} }
@ -413,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload) _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -421,8 +407,18 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: true, ObjectID: id.String()} return PutResponse{Success: true, ObjectID: id.String()}
} }
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object, type epochSource uint64
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
func (s epochSource) CurrentEpoch() uint64 {
return uint64(s)
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
bufSize = chunkSize
}
buf := make([]byte, bufSize) buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload) rdr := bytes.NewReader(payload)
sz := rdr.Size() sz := rdr.Size()
@ -434,6 +430,18 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
if tok != nil { if tok != nil {
prm.WithinSession(*tok) prm.WithinSession(*tok)
} }
if chunkSize > 0 {
prm.SetGRPCPayloadChunkLen(chunkSize)
}
if prepareLocally {
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
prm.WithoutHomomorphicHash(true)
}
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm) objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
if err != nil { if err != nil {
@ -441,21 +449,21 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
return nil, err return nil, err
} }
if !objectWriter.WriteHeader(*hdr) { if !objectWriter.WriteHeader(vu.Context(), *hdr) {
stats.Report(vu, objPutFails, 1) stats.Report(vu, objPutFails, 1)
_, err = objectWriter.Close() _, err = objectWriter.Close(vu.Context())
return nil, err return nil, err
} }
n, _ := rdr.Read(buf) n, _ := rdr.Read(buf)
for n > 0 { for n > 0 {
if !objectWriter.WritePayloadChunk(buf[:n]) { if !objectWriter.WritePayloadChunk(vu.Context(), buf[:n]) {
break break
} }
n, _ = rdr.Read(buf) n, _ = rdr.Read(buf)
} }
resp, err := objectWriter.Close() resp, err := objectWriter.Close(vu.Context())
if err != nil { if err != nil {
stats.Report(vu, objPutFails, 1) stats.Report(vu, objPutFails, 1)
return nil, err return nil, err

View file

@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n} return modules.Exports{Default: n}
} }
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) { func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
var ( var (
cli client.Client cli client.Client
pk *keys.PrivateKey pk *keys.PrivateKey
@ -133,10 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time) cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
return &Client{ return &Client{
vu: n.vu, vu: n.vu,
key: pk.PrivateKey, key: pk.PrivateKey,
tok: tok, tok: tok,
cli: &cli, cli: &cli,
bufsize: defaultBufferSize, prepareLocally: prepareLocally,
}, nil }, nil
} }

View file

@ -116,7 +116,7 @@ func (s *Local) Connect(configFile string, params map[string]string, bucketMappi
ng: ng, ng: ng,
pos: *nodePosition, pos: *nodePosition,
size: *nodeCount, size: *nodeCount,
}) }, zap.L())
rc := rawclient.New(ng, rc := rawclient.New(ng,
rawclient.WithKey(key.PrivateKey), rawclient.WithKey(key.PrivateKey),

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU // Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint); const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -47,11 +50,12 @@ if (registry_enabled && delete_age) {
} }
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0'); const write_vu_count = parseInt(__ENV.WRITERS || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_vu_count > 0) { if (write_vu_count > 0) {
scenarios.write = { scenarios.write = {
executor: 'constant-vus', executor: 'constant-vus',
@ -129,7 +133,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)]; const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled); const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) { if (!resp.success) {
log.withField("cid", container).error(resp.error); log.withField("cid", container).error(resp.error);
return; return;

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU // Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint); const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -55,6 +58,7 @@ const time_unit = __ENV.TIME_UNIT || '1s';
const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0'); const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0');
const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus); const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus);
const write_rate = parseInt(__ENV.WRITE_RATE || '0'); const write_rate = parseInt(__ENV.WRITE_RATE || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_rate > 0) { if (write_rate > 0) {
scenarios.write = { scenarios.write = {
executor: 'constant-arrival-rate', executor: 'constant-arrival-rate',
@ -154,7 +158,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)]; const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled); const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) { if (!resp.success) {
log.withField("cid", container).error(resp.error); log.withField("cid", container).error(resp.error);
return; return;

View file

@ -31,7 +31,7 @@ const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : und
const duration = __ENV.DURATION; const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};

View file

@ -3,15 +3,15 @@ import uuid
from helpers.cmd import execute_cmd from helpers.cmd import execute_cmd
def create_bucket(endpoint, versioning, location): def create_bucket(endpoint, versioning, location, no_verify_ssl):
if location: if location:
location = f"--create-bucket-configuration 'LocationConstraint={location}'" location = f"--create-bucket-configuration 'LocationConstraint={location}'"
bucket_name = str(uuid.uuid4()) bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws --no-verify-ssl s3api create-bucket --bucket {bucket_name} " \ cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint http://{endpoint} {location}" f"--endpoint {endpoint} {location}"
cmd_line_ver = f"aws --no-verify-ssl s3api put-bucket-versioning --bucket {bucket_name} " \ cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint http://{endpoint} " f"--versioning-configuration Status=Enabled --endpoint {endpoint} "
out, success = execute_cmd(cmd_line) out, success = execute_cmd(cmd_line)
@ -28,18 +28,19 @@ def create_bucket(endpoint, versioning, location):
else: else:
print(f" > Bucket versioning has been applied.") print(f" > Bucket versioning has been applied.")
print(f"Created bucket: {bucket_name} via endpoint {endpoint}")
return bucket_name return bucket_name
def upload_object(bucket, payload_filepath, endpoint): def upload_object(bucket, payload_filepath, endpoint, no_verify_ssl):
object_name = str(uuid.uuid4()) object_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws --no-verify-ssl s3api put-object --bucket {bucket} --key {object_name} " \ cmd_line = f"aws {no_verify_ssl_str} s3api put-object --bucket {bucket} --key {object_name} " \
f"--body {payload_filepath} --endpoint http://{endpoint}" f"--body {payload_filepath} --endpoint {endpoint}"
out, success = execute_cmd(cmd_line) out, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Object {object_name} has not been uploaded.") print(f" > Object {object_name} has not been uploaded.")
return False return False
else:
return object_name return bucket, endpoint, object_name

View file

@ -12,19 +12,19 @@ def create_container(endpoint, policy, wallet_file, wallet_config):
if not success: if not success:
print(f" > Container has not been created:\n{output}") print(f" > Container has not been created:\n{output}")
return False return False
else:
try:
fst_str = output.split('\n')[0]
except Exception:
print(f"Got empty output: {output}")
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
print(f"Created container: {splitted[1]}") try:
fst_str = output.split('\n')[0]
except Exception:
print(f"Got empty output: {output}")
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
return splitted[1] print(f"Created container: {splitted[1]} via endpoint {endpoint}")
return splitted[1]
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config): def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
@ -36,17 +36,17 @@ def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_con
if not success: if not success:
print(f" > Object {object_name} has not been uploaded:\n{output}") print(f" > Object {object_name} has not been uploaded:\n{output}")
return False return False
else:
try: try:
# taking second string from command output # taking second string from command output
snd_str = output.split('\n')[1] snd_str = output.split('\n')[1]
except Exception: except Exception:
print(f"Got empty input: {output}") print(f"Got empty input: {output}")
return False return False
splitted = snd_str.split(": ") splitted = snd_str.split(": ")
if len(splitted) != 2: if len(splitted) != 2:
raise Exception(f"no OID was parsed from command output: \t{snd_str}") raise Exception(f"no OID was parsed from command output: \t{snd_str}")
return splitted[1] return container, endpoint, splitted[1]
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config): def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):

View file

@ -1,14 +1,15 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
from itertools import cycle
import json import json
import random import random
import sys import sys
import tempfile import tempfile
import time
from argparse import Namespace from argparse import Namespace
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload from helpers.cmd import random_payload
from helpers.frostfs_cli import create_container, upload_object from helpers.frostfs_cli import create_container, upload_object
@ -28,17 +29,19 @@ parser.add_argument(
help="Container placement policy", help="Container placement policy",
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X" default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
) )
parser.add_argument('--endpoint', help='Node address') parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers') parser.add_argument('--update', help='Save existed containers')
parser.add_argument('--ignore-errors', help='Ignore preset errors') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between container creation and object PUT (in seconds), '
'Default = 8', default=8)
args: Namespace = parser.parse_args() args: Namespace = parser.parse_args()
print(args) print(args)
def main(): def main():
container_list = [] containers = []
objects_list = [] objects_list = []
endpoints = args.endpoint.split(',') endpoints = args.endpoint.split(',')
@ -48,63 +51,73 @@ def main():
workers = int(args.workers) workers = int(args.workers)
objects_per_container = int(args.preload_obj) objects_per_container = int(args.preload_obj)
ignore_errors = True if args.ignore_errors else False ignore_errors = args.ignore_errors
if args.update: if args.update:
# Open file # Open file
with open(args.out) as f: with open(args.out) as f:
data_json = json.load(f) data_json = json.load(f)
container_list = data_json['containers'] containers = data_json['containers']
containers_count = len(container_list) containers_count = len(containers)
else: else:
containers_count = int(args.containers) containers_count = int(args.containers)
print(f"Create containers: {containers_count}") print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config)
args.policy, wallet, wallet_config): _ for _ in range(containers_count)} for _, endpoint in
zip(range(containers_count), cycle(endpoints))]
for run in containers_runs: for run in containers_runs:
if run.result(): container_id = run.result()
container_list.append(run.result()) if container_id:
containers.append(container_id)
print("Create containers: Completed") print("Create containers: Completed")
print(f" > Containers: {container_list}") print(f" > Containers: {containers}")
if containers_count == 0 or len(container_list) != containers_count: if containers_count == 0 or len(containers) != containers_count:
print(f"Containers mismatch in preset: expected {containers_count}, created {len(container_list)}") print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_WRONG_CONTAINERS_COUNT) sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each container: {args.preload_obj} ") print(f"Upload objects to each container: {args.preload_obj} ")
payload_file = tempfile.NamedTemporaryFile() payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size) random_payload(payload_file, args.size)
print(" > Create random payload: Completed") print(" > Create random payload: Completed")
for container in container_list: total_objects = objects_per_container * containers_count
print(f" > Upload objects for container {container}") with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: objects_runs = [executor.submit(upload_object, container, payload_file.name,
objects_runs = {executor.submit(upload_object, container, payload_file.name, endpoint, wallet, wallet_config)
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(objects_per_container)} for _, container, endpoint in
zip(range(total_objects), cycle(containers), cycle(endpoints))]
for run in objects_runs: for run in objects_runs:
if run.result(): result = run.result()
objects_list.append({'container': container, 'object': run.result()}) if run.result:
print(f" > Upload objects for container {container}: Completed") container_id = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'container': container_id, 'object': object_id})
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
print("Upload objects to each container: Completed") print("Upload objects to each container: Completed")
total_objects = objects_per_container * containers_count
if total_objects > 0 and len(objects_list) != total_objects: if total_objects > 0 and len(objects_list) != total_objects:
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}") print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_WRONG_OBJECTS_COUNT) sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f: with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:") print("Result:")
print(f" > Total Containers has been created: {len(container_list)}.") print(f" > Total Containers has been created: {len(containers)}.")
print(f" > Total Objects has been created: {len(objects_list)}.") print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -1,9 +1,11 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
from itertools import cycle
import json import json
import sys import sys
import tempfile import tempfile
import time
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload from helpers.cmd import random_payload
@ -15,13 +17,16 @@ parser.add_argument('--size', help='Upload objects size in kb.')
parser.add_argument('--buckets', help='Number of buckets to create.') parser.add_argument('--buckets', help='Number of buckets to create.')
parser.add_argument('--out', help='JSON file with output.') parser.add_argument('--out', help='JSON file with output.')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.') parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
parser.add_argument('--endpoint', help='S3 Gateway address.') parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). ' parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
'New buckets will not be created.') 'New buckets will not be created.')
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="") parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
parser.add_argument('--versioning', help='True/False, False by default.') parser.add_argument('--versioning', help='True/False, False by default.')
parser.add_argument('--ignore-errors', help='Ignore preset errors') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between container creation and object PUT (in seconds), '
'Default = 8', default=8)
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
@ -31,9 +36,12 @@ ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50 MAX_WORKERS = 50
def main(): def main():
bucket_list = [] buckets = []
objects_list = [] objects_list = []
ignore_errors = True if args.ignore_errors else False ignore_errors = args.ignore_errors
no_verify_ssl = args.no_verify_ssl
endpoints = args.endpoint.split(',')
workers = int(args.workers) workers = int(args.workers)
objects_per_bucket = int(args.preload_obj) objects_per_bucket = int(args.preload_obj)
@ -42,60 +50,68 @@ def main():
# Open file # Open file
with open(args.out) as f: with open(args.out) as f:
data_json = json.load(f) data_json = json.load(f)
bucket_list = data_json['buckets'] buckets = data_json['buckets']
buckets_count = len(bucket_list) buckets_count = len(buckets)
# Get CID list # Get CID list
else: else:
buckets_count = int(args.buckets) buckets_count = int(args.buckets)
print(f"Create buckets: {buckets_count}") print(f"Create buckets: {buckets_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning, buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, no_verify_ssl)
args.location): _ for _ in range(buckets_count)} for _, endpoint in
zip(range(buckets_count), cycle(endpoints))]
for run in buckets_runs: for run in buckets_runs:
if run.result(): bucket_name = run.result()
bucket_list.append(run.result()) if bucket_name:
buckets.append(bucket_name)
print("Create buckets: Completed") print("Create buckets: Completed")
print(f" > Buckets: {bucket_list}") print(f" > Buckets: {buckets}")
if buckets_count == 0 or len(bucket_list) != buckets_count: if buckets_count == 0 or len(buckets) != buckets_count:
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(bucket_list)}") print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_WRONG_CONTAINERS_COUNT) sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each bucket: {objects_per_bucket} ") print(f"Upload objects to each bucket: {objects_per_bucket} ")
payload_file = tempfile.NamedTemporaryFile() payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size) random_payload(payload_file, args.size)
print(" > Create random payload: Completed") print(" > Create random payload: Completed")
for bucket in bucket_list:
print(f" > Upload objects for bucket {bucket}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = {executor.submit(upload_object, bucket, payload_file.name,
args.endpoint): _ for _ in range(objects_per_bucket)}
for run in objects_runs:
if run.result():
objects_list.append({'bucket': bucket, 'object': run.result()})
print(f" > Upload objects for bucket {bucket}: Completed")
print("Upload objects to each bucket: Completed")
total_objects = objects_per_bucket * buckets_count total_objects = objects_per_bucket * buckets_count
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint, no_verify_ssl)
for _, bucket, endpoint in
zip(range(total_objects), cycle(buckets), cycle(endpoints))]
for run in objects_runs:
result = run.result()
if run.result:
bucket = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'bucket': bucket, 'object': object_id})
print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.")
if total_objects > 0 and len(objects_list) != total_objects: if total_objects > 0 and len(objects_list) != total_objects:
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}") print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_WRONG_OBJECTS_COUNT) sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f: with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:") print("Result:")
print(f" > Total Buckets has been created: {len(bucket_list)}.") print(f" > Total Buckets has been created: {len(buckets)}.")
print(f" > Total Objects has been created: {len(objects_list)}.") print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -18,6 +18,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations. * `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`: Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
```shell ```shell

View file

@ -21,10 +21,12 @@ const bucket_list = new SharedArray('bucket_list', function () {
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
// Select random S3 endpoint for current VU // Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`); const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint); const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,7 +48,7 @@ if (registry_enabled && delete_age) {
); );
} }
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};

View file

@ -24,7 +24,9 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU // Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`); const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint); const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;

View file

@ -34,16 +34,21 @@ if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)]; const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
log = log.withField("endpoint", grpcEndpoint); log = log.withField("endpoint", grpcEndpoint);
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0); grpc_client = native.connect(grpcEndpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
} }
// Connect to random S3 endpoint // Connect to random S3 endpoint
let s3_client = undefined; let s3_client = undefined;
if (__ENV.S3_ENDPOINTS) { if (__ENV.S3_ENDPOINTS) {
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
log = log.withField("endpoint", s3_endpoint); log = log.withField("endpoint", s3_endpoint);
s3_client = s3.connect(`http://${s3_endpoint}`); s3_client = s3.connect(s3_endpoint, connection_args);
} }
// We will attempt to verify every object in "created" status. The scenario will execute // We will attempt to verify every object in "created" status. The scenario will execute