forked from TrueCloudLab/xk6-frostfs
Compare commits
11 commits
lorem-ipsu
...
master
Author | SHA1 | Date | |
---|---|---|---|
6182d47b43 | |||
ff6814e15d | |||
56235f5e90 | |||
f633f9a64a | |||
42f1881580 | |||
4972bb928e | |||
a1f5738d2f | |||
8e99d08aa4 | |||
ba04c682cb | |||
3525d5b4e3 | |||
62d7b78131 |
23 changed files with 496 additions and 1887 deletions
|
@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
|
|||
- hex encoded private key (empty value produces random key)
|
||||
- dial timeout in seconds (0 for the default value)
|
||||
- stream timeout in seconds (0 for the default value)
|
||||
- generate object header on the client side (for big object - split locally too)
|
||||
|
||||
```js
|
||||
import native from 'k6/x/frostfs/native';
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||
```
|
||||
|
||||
### Methods
|
||||
|
@ -98,13 +99,13 @@ Credentials are taken from default AWS configuration files and ENVs.
|
|||
|
||||
```js
|
||||
import s3 from 'k6/x/frostfs/s3';
|
||||
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080")
|
||||
const s3_cli = s3.connect("https://s3.frostfs.devenv:8080")
|
||||
```
|
||||
|
||||
You can also provide additional options:
|
||||
```js
|
||||
import s3 from 'k6/x/frostfs/s3';
|
||||
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
|
||||
const s3_cli = s3.connect("https://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
|
||||
```
|
||||
|
||||
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
|
||||
|
|
|
@ -3,7 +3,7 @@ import { fail } from "k6";
|
|||
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
|
||||
|
||||
export const options = {
|
||||
stages: [
|
||||
|
|
|
@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
|||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
||||
|
||||
export const options = {
|
||||
|
|
125
go.mod
125
go.mod
|
@ -3,47 +3,50 @@ module git.frostfs.info/TrueCloudLab/xk6-frostfs
|
|||
go 1.19
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230522084814-731bf5d0ee66
|
||||
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.24.1-0.20230403110435-01afa1cae425
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230519144724-f5b23eb22569
|
||||
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230704155826-b520a3049e6f
|
||||
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.16.3
|
||||
github.com/aws/aws-sdk-go-v2/config v1.15.5
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9
|
||||
github.com/dop251/goja v0.0.0-20230427124612-428fc442ff5f
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1
|
||||
github.com/aws/aws-sdk-go-v2/config v1.18.27
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.36.0
|
||||
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
|
||||
github.com/go-loremipsum/loremipsum v1.1.3
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/nspcc-dev/neo-go v0.101.1
|
||||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
github.com/sirupsen/logrus v1.9.2
|
||||
github.com/stretchr/testify v1.8.3
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.k6.io/k6 v0.44.2-0.20230524054758-add1a5fe5019
|
||||
github.com/nspcc-dev/neo-go v0.101.2
|
||||
github.com/panjf2000/ants/v2 v2.8.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/stretchr/testify v1.8.4
|
||||
go.etcd.io/bbolt v1.3.7
|
||||
go.k6.io/k6 v0.45.0
|
||||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/sys v0.8.0
|
||||
golang.org/x/sys v0.10.0
|
||||
)
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230519114017-0c67b8fefa41 // indirect
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142716-68021b910acb // indirect
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 // indirect
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect
|
||||
github.com/aws/aws-sdk-go v1.44.6 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.12.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.11 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.5 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.11.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.16.4 // indirect
|
||||
github.com/aws/smithy-go v1.11.2 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.44.296 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.13.26 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
|
||||
github.com/aws/smithy-go v1.13.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bluele/gcache v0.0.2 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
|
@ -57,65 +60,67 @@ require (
|
|||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 // indirect
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.16.5 // indirect
|
||||
github.com/klauspost/compress v1.16.7 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/minio/highwayhash v1.0.2 // indirect
|
||||
github.com/minio/sio v0.3.0 // indirect
|
||||
github.com/minio/sio v0.3.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
|
||||
github.com/nats-io/jwt/v2 v2.4.1 // indirect
|
||||
github.com/nats-io/nats.go v1.25.0 // indirect
|
||||
github.com/nats-io/nats.go v1.27.1 // indirect
|
||||
github.com/nats-io/nkeys v0.4.4 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||
github.com/onsi/ginkgo v1.16.5 // indirect
|
||||
github.com/onsi/gomega v1.20.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v1.15.1 // indirect
|
||||
github.com/prometheus/client_golang v1.16.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.44.0 // indirect
|
||||
github.com/prometheus/procfs v0.10.0 // indirect
|
||||
github.com/prometheus/procfs v0.11.0 // indirect
|
||||
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.9.5 // indirect
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/cast v1.5.1 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/spf13/viper v1.15.0 // indirect
|
||||
github.com/spf13/viper v1.16.0 // indirect
|
||||
github.com/subosito/gotenv v1.4.2 // indirect
|
||||
go.opentelemetry.io/otel v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.15.1 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.15.1 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||
go.opentelemetry.io/otel v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.16.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.20.0 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.9.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
||||
golang.org/x/net v0.10.0 // indirect
|
||||
golang.org/x/sync v0.2.0 // indirect
|
||||
golang.org/x/text v0.9.0 // indirect
|
||||
golang.org/x/crypto v0.11.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
|
||||
golang.org/x/net v0.12.0 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/text v0.11.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||
google.golang.org/grpc v1.55.0 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect
|
||||
google.golang.org/grpc v1.56.1 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
gopkg.in/guregu/null.v3 v3.5.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package datagen
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"go.k6.io/k6/js/modules"
|
||||
)
|
||||
|
||||
|
@ -36,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
|
|||
return modules.Exports{Default: d}
|
||||
}
|
||||
|
||||
func (d *Datagen) Generator(size int) *Generator {
|
||||
g := NewGenerator(d.vu, size)
|
||||
func (d *Datagen) Generator(size int, typ string) *Generator {
|
||||
g := NewGenerator(d.vu, size, strings.ToLower(typ))
|
||||
return &g
|
||||
}
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
package datagen
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/dop251/goja"
|
||||
"github.com/go-loremipsum/loremipsum"
|
||||
"go.k6.io/k6/js/modules"
|
||||
)
|
||||
|
||||
|
@ -24,6 +26,7 @@ type (
|
|||
size int
|
||||
rand *rand.Rand
|
||||
buf []byte
|
||||
typ string
|
||||
offset int
|
||||
}
|
||||
|
||||
|
@ -36,19 +39,53 @@ type (
|
|||
// TailSize specifies number of extra random bytes in the buffer tail.
|
||||
const TailSize = 1024
|
||||
|
||||
func NewGenerator(vu modules.VU, size int) Generator {
|
||||
var payloadTypes = []string{
|
||||
"text",
|
||||
"random",
|
||||
"",
|
||||
}
|
||||
|
||||
func NewGenerator(vu modules.VU, size int, typ string) Generator {
|
||||
if size <= 0 {
|
||||
panic("size should be positive")
|
||||
}
|
||||
|
||||
var found bool
|
||||
for i := range payloadTypes {
|
||||
if payloadTypes[i] == typ {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
buf := make([]byte, size+TailSize)
|
||||
r.Read(buf)
|
||||
return Generator{
|
||||
g := Generator{
|
||||
vu: vu,
|
||||
size: size,
|
||||
rand: r,
|
||||
buf: buf,
|
||||
typ: typ,
|
||||
}
|
||||
g.fillBuffer()
|
||||
return g
|
||||
}
|
||||
|
||||
func (g *Generator) fillBuffer() {
|
||||
switch g.typ {
|
||||
case "text":
|
||||
li := loremipsum.New()
|
||||
b := bytes.NewBuffer(g.buf[:0])
|
||||
for b.Len() < g.size+TailSize {
|
||||
b.WriteString(li.Paragraph())
|
||||
b.WriteRune('\n')
|
||||
}
|
||||
g.buf = b.Bytes()
|
||||
default:
|
||||
rand.Read(g.buf) // Per docs, err is always nil here
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,9 +103,9 @@ func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
|
|||
}
|
||||
|
||||
func (g *Generator) nextSlice() []byte {
|
||||
if g.offset >= TailSize {
|
||||
if g.offset+g.size >= len(g.buf) {
|
||||
g.offset = 0
|
||||
g.rand.Read(g.buf) // Per docs, err is always nil here
|
||||
g.fillBuffer()
|
||||
}
|
||||
|
||||
result := g.buf[g.offset : g.offset+g.size]
|
||||
|
|
|
@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
|
|||
|
||||
t.Run("fails on negative size", func(t *testing.T) {
|
||||
require.Panics(t, func() {
|
||||
_ = NewGenerator(vu, -1)
|
||||
_ = NewGenerator(vu, -1, "")
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("fails on zero size", func(t *testing.T) {
|
||||
require.Panics(t, func() {
|
||||
_ = NewGenerator(vu, 0)
|
||||
_ = NewGenerator(vu, 0, "")
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("creates slice of specified size", func(t *testing.T) {
|
||||
size := 10
|
||||
g := NewGenerator(vu, size)
|
||||
g := NewGenerator(vu, size, "")
|
||||
slice := g.nextSlice()
|
||||
require.Len(t, slice, size)
|
||||
})
|
||||
|
||||
t.Run("creates a different slice on each call", func(t *testing.T) {
|
||||
g := NewGenerator(vu, 1000)
|
||||
g := NewGenerator(vu, 1000, "")
|
||||
slice1 := g.nextSlice()
|
||||
slice2 := g.nextSlice()
|
||||
// Each slice should be unique (assuming that 1000 random bytes will never coincide
|
||||
|
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
|
||||
g := NewGenerator(vu, 1000)
|
||||
g := NewGenerator(vu, 1000, "")
|
||||
initialSlice := g.nextSlice()
|
||||
for i := 0; i < TailSize; i++ {
|
||||
g.nextSlice()
|
||||
|
|
|
@ -292,17 +292,20 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti
|
|||
|
||||
// write cache
|
||||
if wc := sc.WriteCache(); wc.Enabled() {
|
||||
opts = append(opts, shard.WithWriteCacheOptions(
|
||||
writecache.WithPath(wc.Path()),
|
||||
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
|
||||
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
|
||||
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
|
||||
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
|
||||
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
|
||||
writecache.WithMaxCacheSize(wc.SizeLimit()),
|
||||
writecache.WithNoSync(wc.NoSync()),
|
||||
writecache.WithLogger(&logger.Logger{Logger: log}),
|
||||
))
|
||||
opts = append(opts,
|
||||
shard.WithWriteCache(true),
|
||||
shard.WithWriteCacheOptions(
|
||||
writecache.WithPath(wc.Path()),
|
||||
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
|
||||
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
|
||||
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
|
||||
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
|
||||
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
|
||||
writecache.WithMaxCacheSize(wc.SizeLimit()),
|
||||
writecache.WithNoSync(wc.NoSync()),
|
||||
writecache.WithLogger(&logger.Logger{Logger: log}),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// tree
|
||||
|
|
|
@ -31,11 +31,11 @@ import (
|
|||
|
||||
type (
|
||||
Client struct {
|
||||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
tok session.Object
|
||||
cli *client.Client
|
||||
bufsize int
|
||||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
tok session.Object
|
||||
cli *client.Client
|
||||
prepareLocally bool
|
||||
}
|
||||
|
||||
PutResponse struct {
|
||||
|
@ -66,30 +66,18 @@ type (
|
|||
}
|
||||
|
||||
PreparedObject struct {
|
||||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
cli *client.Client
|
||||
bufsize int
|
||||
|
||||
hdr object.Object
|
||||
payload []byte
|
||||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
cli *client.Client
|
||||
hdr object.Object
|
||||
payload []byte
|
||||
prepareLocally bool
|
||||
}
|
||||
)
|
||||
|
||||
const defaultBufferSize = 64 * 1024
|
||||
|
||||
func (c *Client) SetBufferSize(size int) {
|
||||
if size < 0 {
|
||||
panic("buffer size must be positive")
|
||||
}
|
||||
if size == 0 {
|
||||
c.bufsize = defaultBufferSize
|
||||
} else {
|
||||
c.bufsize = size
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
|
||||
cliContainerID := parseContainerID(containerID)
|
||||
|
||||
tok := c.tok
|
||||
|
@ -116,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
|||
o.SetOwnerID(&owner)
|
||||
o.SetAttributes(attrs...)
|
||||
|
||||
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes())
|
||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -176,7 +164,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
|
|||
prm.WithinSession(tok)
|
||||
|
||||
var objSize = 0
|
||||
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
|
||||
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
|
||||
objSize += len(data)
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -194,10 +182,9 @@ func get(
|
|||
cli *client.Client,
|
||||
prm client.PrmObjectGet,
|
||||
ctx context.Context,
|
||||
bufSize int,
|
||||
onDataChunk func(chunk []byte),
|
||||
) error {
|
||||
var buf = make([]byte, bufSize)
|
||||
var buf = make([]byte, defaultBufferSize)
|
||||
|
||||
objectReader, err := cli.ObjectGetInit(ctx, prm)
|
||||
if err != nil {
|
||||
|
@ -245,7 +232,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
|
|||
prm.WithinSession(tok)
|
||||
|
||||
hasher := sha256.New()
|
||||
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
|
||||
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
|
||||
hasher.Write(data)
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -381,13 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
|
|||
}
|
||||
|
||||
return PreparedObject{
|
||||
vu: c.vu,
|
||||
key: c.key,
|
||||
cli: c.cli,
|
||||
bufsize: c.bufsize,
|
||||
|
||||
hdr: *obj,
|
||||
payload: data,
|
||||
vu: c.vu,
|
||||
key: c.key,
|
||||
cli: c.cli,
|
||||
hdr: *obj,
|
||||
payload: data,
|
||||
prepareLocally: c.prepareLocally,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -413,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
|||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
||||
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload)
|
||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -421,8 +407,18 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
|||
return PutResponse{Success: true, ObjectID: id.String()}
|
||||
}
|
||||
|
||||
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
||||
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
|
||||
type epochSource uint64
|
||||
|
||||
func (s epochSource) CurrentEpoch() uint64 {
|
||||
return uint64(s)
|
||||
}
|
||||
|
||||
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
||||
bufSize := defaultBufferSize
|
||||
if chunkSize > 0 {
|
||||
bufSize = chunkSize
|
||||
}
|
||||
buf := make([]byte, bufSize)
|
||||
rdr := bytes.NewReader(payload)
|
||||
sz := rdr.Size()
|
||||
|
@ -434,6 +430,18 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
|||
if tok != nil {
|
||||
prm.WithinSession(*tok)
|
||||
}
|
||||
if chunkSize > 0 {
|
||||
prm.SetGRPCPayloadChunkLen(chunkSize)
|
||||
}
|
||||
if prepareLocally {
|
||||
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
|
||||
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
|
||||
prm.WithoutHomomorphicHash(true)
|
||||
}
|
||||
|
||||
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
||||
if err != nil {
|
||||
|
@ -441,21 +449,21 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if !objectWriter.WriteHeader(*hdr) {
|
||||
if !objectWriter.WriteHeader(vu.Context(), *hdr) {
|
||||
stats.Report(vu, objPutFails, 1)
|
||||
_, err = objectWriter.Close()
|
||||
_, err = objectWriter.Close(vu.Context())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n, _ := rdr.Read(buf)
|
||||
for n > 0 {
|
||||
if !objectWriter.WritePayloadChunk(buf[:n]) {
|
||||
if !objectWriter.WritePayloadChunk(vu.Context(), buf[:n]) {
|
||||
break
|
||||
}
|
||||
n, _ = rdr.Read(buf)
|
||||
}
|
||||
|
||||
resp, err := objectWriter.Close()
|
||||
resp, err := objectWriter.Close(vu.Context())
|
||||
if err != nil {
|
||||
stats.Report(vu, objPutFails, 1)
|
||||
return nil, err
|
||||
|
|
|
@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
|
|||
return modules.Exports{Default: n}
|
||||
}
|
||||
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
|
||||
var (
|
||||
cli client.Client
|
||||
pk *keys.PrivateKey
|
||||
|
@ -133,10 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
|||
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
|
||||
|
||||
return &Client{
|
||||
vu: n.vu,
|
||||
key: pk.PrivateKey,
|
||||
tok: tok,
|
||||
cli: &cli,
|
||||
bufsize: defaultBufferSize,
|
||||
vu: n.vu,
|
||||
key: pk.PrivateKey,
|
||||
tok: tok,
|
||||
cli: &cli,
|
||||
prepareLocally: prepareLocally,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ func (s *Local) Connect(configFile string, params map[string]string, bucketMappi
|
|||
ng: ng,
|
||||
pos: *nodePosition,
|
||||
size: *nodeCount,
|
||||
})
|
||||
}, zap.L())
|
||||
|
||||
rc := rawclient.New(ng,
|
||||
rawclient.WithKey(key.PrivateKey),
|
||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
// Select random gRPC endpoint for current VU
|
||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
||||
const grpc_client = native.connect(grpc_endpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
@ -47,11 +50,12 @@ if (registry_enabled && delete_age) {
|
|||
}
|
||||
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
||||
|
||||
const scenarios = {};
|
||||
|
||||
const write_vu_count = parseInt(__ENV.WRITERS || '0');
|
||||
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
|
||||
if (write_vu_count > 0) {
|
||||
scenarios.write = {
|
||||
executor: 'constant-vus',
|
||||
|
@ -129,7 +133,7 @@ export function obj_write() {
|
|||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const resp = grpc_client.put(container, headers, payload);
|
||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||
if (!resp.success) {
|
||||
log.withField("cid", container).error(resp.error);
|
||||
return;
|
||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
// Select random gRPC endpoint for current VU
|
||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
||||
const grpc_client = native.connect(grpc_endpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
@ -55,6 +58,7 @@ const time_unit = __ENV.TIME_UNIT || '1s';
|
|||
const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0');
|
||||
const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus);
|
||||
const write_rate = parseInt(__ENV.WRITE_RATE || '0');
|
||||
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
|
||||
if (write_rate > 0) {
|
||||
scenarios.write = {
|
||||
executor: 'constant-arrival-rate',
|
||||
|
@ -154,7 +158,7 @@ export function obj_write() {
|
|||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const resp = grpc_client.put(container, headers, payload);
|
||||
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
|
||||
if (!resp.success) {
|
||||
log.withField("cid", container).error(resp.error);
|
||||
return;
|
||||
|
|
|
@ -31,7 +31,7 @@ const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : und
|
|||
|
||||
const duration = __ENV.DURATION;
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
||||
|
||||
const scenarios = {};
|
||||
|
||||
|
|
|
@ -3,15 +3,15 @@ import uuid
|
|||
from helpers.cmd import execute_cmd
|
||||
|
||||
|
||||
def create_bucket(endpoint, versioning, location):
|
||||
def create_bucket(endpoint, versioning, location, no_verify_ssl):
|
||||
if location:
|
||||
location = f"--create-bucket-configuration 'LocationConstraint={location}'"
|
||||
bucket_name = str(uuid.uuid4())
|
||||
|
||||
cmd_line = f"aws --no-verify-ssl s3api create-bucket --bucket {bucket_name} " \
|
||||
f"--endpoint http://{endpoint} {location}"
|
||||
cmd_line_ver = f"aws --no-verify-ssl s3api put-bucket-versioning --bucket {bucket_name} " \
|
||||
f"--versioning-configuration Status=Enabled --endpoint http://{endpoint} "
|
||||
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
|
||||
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
|
||||
f"--endpoint {endpoint} {location}"
|
||||
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
|
||||
f"--versioning-configuration Status=Enabled --endpoint {endpoint} "
|
||||
|
||||
out, success = execute_cmd(cmd_line)
|
||||
|
||||
|
@ -27,19 +27,20 @@ def create_bucket(endpoint, versioning, location):
|
|||
print(f" > Bucket versioning has not been applied for bucket {bucket_name}:\n{out}")
|
||||
else:
|
||||
print(f" > Bucket versioning has been applied.")
|
||||
|
||||
|
||||
print(f"Created bucket: {bucket_name} via endpoint {endpoint}")
|
||||
return bucket_name
|
||||
|
||||
|
||||
def upload_object(bucket, payload_filepath, endpoint):
|
||||
def upload_object(bucket, payload_filepath, endpoint, no_verify_ssl):
|
||||
object_name = str(uuid.uuid4())
|
||||
|
||||
cmd_line = f"aws --no-verify-ssl s3api put-object --bucket {bucket} --key {object_name} " \
|
||||
f"--body {payload_filepath} --endpoint http://{endpoint}"
|
||||
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
|
||||
cmd_line = f"aws {no_verify_ssl_str} s3api put-object --bucket {bucket} --key {object_name} " \
|
||||
f"--body {payload_filepath} --endpoint {endpoint}"
|
||||
out, success = execute_cmd(cmd_line)
|
||||
|
||||
if not success:
|
||||
print(f" > Object {object_name} has not been uploaded.")
|
||||
return False
|
||||
else:
|
||||
return object_name
|
||||
|
||||
return bucket, endpoint, object_name
|
||||
|
|
|
@ -12,19 +12,19 @@ def create_container(endpoint, policy, wallet_file, wallet_config):
|
|||
if not success:
|
||||
print(f" > Container has not been created:\n{output}")
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
fst_str = output.split('\n')[0]
|
||||
except Exception:
|
||||
print(f"Got empty output: {output}")
|
||||
return False
|
||||
splitted = fst_str.split(": ")
|
||||
if len(splitted) != 2:
|
||||
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
|
||||
|
||||
try:
|
||||
fst_str = output.split('\n')[0]
|
||||
except Exception:
|
||||
print(f"Got empty output: {output}")
|
||||
return False
|
||||
splitted = fst_str.split(": ")
|
||||
if len(splitted) != 2:
|
||||
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
|
||||
|
||||
print(f"Created container: {splitted[1]}")
|
||||
print(f"Created container: {splitted[1]} via endpoint {endpoint}")
|
||||
|
||||
return splitted[1]
|
||||
return splitted[1]
|
||||
|
||||
|
||||
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
|
||||
|
@ -36,17 +36,17 @@ def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_con
|
|||
if not success:
|
||||
print(f" > Object {object_name} has not been uploaded:\n{output}")
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
# taking second string from command output
|
||||
snd_str = output.split('\n')[1]
|
||||
except Exception:
|
||||
print(f"Got empty input: {output}")
|
||||
return False
|
||||
splitted = snd_str.split(": ")
|
||||
if len(splitted) != 2:
|
||||
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
|
||||
return splitted[1]
|
||||
|
||||
try:
|
||||
# taking second string from command output
|
||||
snd_str = output.split('\n')[1]
|
||||
except Exception:
|
||||
print(f"Got empty input: {output}")
|
||||
return False
|
||||
splitted = snd_str.split(": ")
|
||||
if len(splitted) != 2:
|
||||
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
|
||||
return container, endpoint, splitted[1]
|
||||
|
||||
|
||||
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
from itertools import cycle
|
||||
import json
|
||||
import random
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from argparse import Namespace
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
|
||||
from helpers.cmd import random_payload
|
||||
from helpers.frostfs_cli import create_container, upload_object
|
||||
|
||||
|
@ -28,17 +29,19 @@ parser.add_argument(
|
|||
help="Container placement policy",
|
||||
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
||||
)
|
||||
parser.add_argument('--endpoint', help='Node address')
|
||||
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
|
||||
parser.add_argument('--update', help='Save existed containers')
|
||||
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
||||
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
|
||||
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
||||
parser.add_argument('--sleep', help='Time to sleep between container creation and object PUT (in seconds), '
|
||||
'Default = 8', default=8)
|
||||
|
||||
args: Namespace = parser.parse_args()
|
||||
print(args)
|
||||
|
||||
|
||||
def main():
|
||||
container_list = []
|
||||
containers = []
|
||||
objects_list = []
|
||||
|
||||
endpoints = args.endpoint.split(',')
|
||||
|
@ -48,63 +51,73 @@ def main():
|
|||
workers = int(args.workers)
|
||||
objects_per_container = int(args.preload_obj)
|
||||
|
||||
ignore_errors = True if args.ignore_errors else False
|
||||
ignore_errors = args.ignore_errors
|
||||
if args.update:
|
||||
# Open file
|
||||
with open(args.out) as f:
|
||||
data_json = json.load(f)
|
||||
container_list = data_json['containers']
|
||||
containers_count = len(container_list)
|
||||
containers = data_json['containers']
|
||||
containers_count = len(containers)
|
||||
else:
|
||||
containers_count = int(args.containers)
|
||||
print(f"Create containers: {containers_count}")
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
||||
args.policy, wallet, wallet_config): _ for _ in range(containers_count)}
|
||||
containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config)
|
||||
for _, endpoint in
|
||||
zip(range(containers_count), cycle(endpoints))]
|
||||
|
||||
for run in containers_runs:
|
||||
if run.result():
|
||||
container_list.append(run.result())
|
||||
container_id = run.result()
|
||||
if container_id:
|
||||
containers.append(container_id)
|
||||
|
||||
print("Create containers: Completed")
|
||||
|
||||
print(f" > Containers: {container_list}")
|
||||
if containers_count == 0 or len(container_list) != containers_count:
|
||||
print(f"Containers mismatch in preset: expected {containers_count}, created {len(container_list)}")
|
||||
print(f" > Containers: {containers}")
|
||||
if containers_count == 0 or len(containers) != containers_count:
|
||||
print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
|
||||
if not ignore_errors:
|
||||
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
||||
|
||||
if args.sleep != 0:
|
||||
print(f"Sleep for {args.sleep} seconds")
|
||||
time.sleep(args.sleep)
|
||||
|
||||
print(f"Upload objects to each container: {args.preload_obj} ")
|
||||
payload_file = tempfile.NamedTemporaryFile()
|
||||
random_payload(payload_file, args.size)
|
||||
print(" > Create random payload: Completed")
|
||||
|
||||
for container in container_list:
|
||||
print(f" > Upload objects for container {container}")
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
objects_runs = {executor.submit(upload_object, container, payload_file.name,
|
||||
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(objects_per_container)}
|
||||
|
||||
for run in objects_runs:
|
||||
if run.result():
|
||||
objects_list.append({'container': container, 'object': run.result()})
|
||||
print(f" > Upload objects for container {container}: Completed")
|
||||
total_objects = objects_per_container * containers_count
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
objects_runs = [executor.submit(upload_object, container, payload_file.name,
|
||||
endpoint, wallet, wallet_config)
|
||||
for _, container, endpoint in
|
||||
zip(range(total_objects), cycle(containers), cycle(endpoints))]
|
||||
|
||||
for run in objects_runs:
|
||||
result = run.result()
|
||||
if run.result:
|
||||
container_id = result[0]
|
||||
endpoint = result[1]
|
||||
object_id = result[2]
|
||||
objects_list.append({'container': container_id, 'object': object_id})
|
||||
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
|
||||
|
||||
print("Upload objects to each container: Completed")
|
||||
|
||||
total_objects = objects_per_container * containers_count
|
||||
if total_objects > 0 and len(objects_list) != total_objects:
|
||||
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
||||
if not ignore_errors:
|
||||
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
||||
|
||||
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||
data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||
|
||||
with open(args.out, 'w+') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print("Result:")
|
||||
print(f" > Total Containers has been created: {len(container_list)}.")
|
||||
print(f" > Total Containers has been created: {len(containers)}.")
|
||||
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
from itertools import cycle
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
|
||||
from helpers.cmd import random_payload
|
||||
|
@ -15,13 +17,16 @@ parser.add_argument('--size', help='Upload objects size in kb.')
|
|||
parser.add_argument('--buckets', help='Number of buckets to create.')
|
||||
parser.add_argument('--out', help='JSON file with output.')
|
||||
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
|
||||
parser.add_argument('--endpoint', help='S3 Gateway address.')
|
||||
parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
|
||||
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
|
||||
'New buckets will not be created.')
|
||||
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
|
||||
parser.add_argument('--versioning', help='True/False, False by default.')
|
||||
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
||||
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
|
||||
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
|
||||
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
||||
parser.add_argument('--sleep', help='Time to sleep between container creation and object PUT (in seconds), '
|
||||
'Default = 8', default=8)
|
||||
|
||||
args = parser.parse_args()
|
||||
print(args)
|
||||
|
@ -31,9 +36,12 @@ ERROR_WRONG_OBJECTS_COUNT = 2
|
|||
MAX_WORKERS = 50
|
||||
|
||||
def main():
|
||||
bucket_list = []
|
||||
buckets = []
|
||||
objects_list = []
|
||||
ignore_errors = True if args.ignore_errors else False
|
||||
ignore_errors = args.ignore_errors
|
||||
no_verify_ssl = args.no_verify_ssl
|
||||
|
||||
endpoints = args.endpoint.split(',')
|
||||
|
||||
workers = int(args.workers)
|
||||
objects_per_bucket = int(args.preload_obj)
|
||||
|
@ -42,60 +50,68 @@ def main():
|
|||
# Open file
|
||||
with open(args.out) as f:
|
||||
data_json = json.load(f)
|
||||
bucket_list = data_json['buckets']
|
||||
buckets_count = len(bucket_list)
|
||||
buckets = data_json['buckets']
|
||||
buckets_count = len(buckets)
|
||||
# Get CID list
|
||||
else:
|
||||
buckets_count = int(args.buckets)
|
||||
print(f"Create buckets: {buckets_count}")
|
||||
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning,
|
||||
args.location): _ for _ in range(buckets_count)}
|
||||
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, no_verify_ssl)
|
||||
for _, endpoint in
|
||||
zip(range(buckets_count), cycle(endpoints))]
|
||||
|
||||
for run in buckets_runs:
|
||||
if run.result():
|
||||
bucket_list.append(run.result())
|
||||
bucket_name = run.result()
|
||||
if bucket_name:
|
||||
buckets.append(bucket_name)
|
||||
|
||||
print("Create buckets: Completed")
|
||||
|
||||
print(f" > Buckets: {bucket_list}")
|
||||
if buckets_count == 0 or len(bucket_list) != buckets_count:
|
||||
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(bucket_list)}")
|
||||
print(f" > Buckets: {buckets}")
|
||||
if buckets_count == 0 or len(buckets) != buckets_count:
|
||||
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}")
|
||||
if not ignore_errors:
|
||||
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
||||
|
||||
if args.sleep != 0:
|
||||
print(f"Sleep for {args.sleep} seconds")
|
||||
time.sleep(args.sleep)
|
||||
|
||||
print(f"Upload objects to each bucket: {objects_per_bucket} ")
|
||||
payload_file = tempfile.NamedTemporaryFile()
|
||||
random_payload(payload_file, args.size)
|
||||
print(" > Create random payload: Completed")
|
||||
|
||||
for bucket in bucket_list:
|
||||
print(f" > Upload objects for bucket {bucket}")
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
objects_runs = {executor.submit(upload_object, bucket, payload_file.name,
|
||||
args.endpoint): _ for _ in range(objects_per_bucket)}
|
||||
|
||||
for run in objects_runs:
|
||||
if run.result():
|
||||
objects_list.append({'bucket': bucket, 'object': run.result()})
|
||||
print(f" > Upload objects for bucket {bucket}: Completed")
|
||||
|
||||
print("Upload objects to each bucket: Completed")
|
||||
|
||||
total_objects = objects_per_bucket * buckets_count
|
||||
|
||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||
objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint, no_verify_ssl)
|
||||
for _, bucket, endpoint in
|
||||
zip(range(total_objects), cycle(buckets), cycle(endpoints))]
|
||||
|
||||
for run in objects_runs:
|
||||
result = run.result()
|
||||
if run.result:
|
||||
bucket = result[0]
|
||||
endpoint = result[1]
|
||||
object_id = result[2]
|
||||
objects_list.append({'bucket': bucket, 'object': object_id})
|
||||
print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.")
|
||||
|
||||
if total_objects > 0 and len(objects_list) != total_objects:
|
||||
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
||||
if not ignore_errors:
|
||||
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
||||
|
||||
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||
data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||
|
||||
with open(args.out, 'w+') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print("Result:")
|
||||
print(f" > Total Buckets has been created: {len(bucket_list)}.")
|
||||
print(f" > Total Buckets has been created: {len(buckets)}.")
|
||||
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
|
|||
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
|
||||
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
|
||||
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
|
||||
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
|
||||
|
||||
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
|
||||
```shell
|
||||
|
|
|
@ -21,10 +21,12 @@ const bucket_list = new SharedArray('bucket_list', function () {
|
|||
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
||||
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
||||
|
||||
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
|
||||
const connection_args = {no_verify_ssl: no_verify_ssl}
|
||||
// Select random S3 endpoint for current VU
|
||||
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
|
||||
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
|
||||
const s3_client = s3.connect(`http://${s3_endpoint}`);
|
||||
const s3_client = s3.connect(s3_endpoint, connection_args);
|
||||
const log = logging.new().withField("endpoint", s3_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
@ -46,7 +48,7 @@ if (registry_enabled && delete_age) {
|
|||
);
|
||||
}
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
||||
|
||||
const scenarios = {};
|
||||
|
||||
|
|
|
@ -24,7 +24,9 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
// Select random S3 endpoint for current VU
|
||||
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
|
||||
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
|
||||
const s3_client = s3.connect(`http://${s3_endpoint}`);
|
||||
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
|
||||
const connection_args = {no_verify_ssl: no_verify_ssl}
|
||||
const s3_client = s3.connect(s3_endpoint, connection_args);
|
||||
const log = logging.new().withField("endpoint", s3_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
|
|
@ -34,16 +34,21 @@ if (__ENV.GRPC_ENDPOINTS) {
|
|||
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
|
||||
log = log.withField("endpoint", grpcEndpoint);
|
||||
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0);
|
||||
grpc_client = native.connect(grpcEndpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
|
||||
}
|
||||
|
||||
// Connect to random S3 endpoint
|
||||
let s3_client = undefined;
|
||||
if (__ENV.S3_ENDPOINTS) {
|
||||
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
|
||||
const connection_args = {no_verify_ssl: no_verify_ssl}
|
||||
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
|
||||
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
|
||||
log = log.withField("endpoint", s3_endpoint);
|
||||
s3_client = s3.connect(`http://${s3_endpoint}`);
|
||||
s3_client = s3.connect(s3_endpoint, connection_args);
|
||||
}
|
||||
|
||||
// We will attempt to verify every object in "created" status. The scenario will execute
|
||||
|
|
Loading…
Reference in a new issue