[#14]: Add local target support #35

Merged
fyrchik merged 1 commit from ale64bit/xk6-frostfs:feature/14-local_targets into master 2023-03-23 11:58:11 +00:00
10 changed files with 1555 additions and 17 deletions

View file

@ -69,6 +69,25 @@ const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
It returns dictionary with `success` boolean flag, `object_id` string and
`error` string.
## Local
Create a local client with `connect` method. Arguments:
- local path to frostfs storage node configuration file
- hex encoded private key (empty value produces random key)
```js
import local from 'k6/x/frostfs/local';
const local_client = local.connect("/path/to/config.yaml", "")
```
### Methods
- `put(container_id, headers, payload)`. Returns dictionary with `success`
boolean flag, `object_id` string, and `error` string.
- `get(container_id, object_id)`. Returns dictionary with `success` boolean
flag, and `error` string.
- `delete(container_id, object_id)`. Returns dictionary with `success` boolean
flag, and `error` string.
## S3
Create s3 client with `connect` method. Arguments:

24
examples/local.js Normal file
View file

@ -0,0 +1,24 @@
import {uuidv4} from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
fyrchik marked this conversation as resolved Outdated

Can we clone it? See #19 for scenarios/libs.
Also it feels k6-related, can't we import it without https?

Can we clone it? See #19 for `scenarios/libs`. Also it feels `k6`-related, can't we import it without `https`?

Possibly, but this is what all examples were doing so I copy-pasted it blindly. Let's refactor it in a separate PR since it touches a bunch of other files.

Possibly, but this is what all examples were doing so I copy-pasted it blindly. Let's refactor it in a separate PR since it touches a bunch of other files.
import local from 'k6/x/frostfs/local';
const payload = open('../go.sum', 'b');
const local_cli = local.connect("/path/to/config.yaml", "")
export const options = {
stages: [
{duration: '30s', target: 10},
],
};
export default function () {
let headers = {
'unique_header': uuidv4()
}
const container_id = '6BVPPXQewRJ6J5EYmAPLczXxNocS7ikyF7amS2esWQnb';
let resp = local_cli.put(container_id, headers, payload)
if (resp.success) {
local_cli.get(container_id, resp.object_id)
} else {
console.log(resp.error)
}
}

View file

@ -3,6 +3,7 @@ package xk6_frostfs
import (
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/env"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/logging"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/native"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"

31
go.mod
View file

@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/xk6-frostfs
go 1.17
require (
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230313113918-4e244686cf03
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230307124721-94476f905599
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/aws/aws-sdk-go-v2 v1.16.3
@ -12,10 +13,12 @@ require (
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.5.1
github.com/nspcc-dev/neo-go v0.100.1
github.com/panjf2000/ants/v2 v2.4.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.1
go.etcd.io/bbolt v1.3.6
go.k6.io/k6 v0.38.2
go.uber.org/zap v1.24.0
)
require (
@ -42,30 +45,46 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.14.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221227203929-1b447090c38c // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/genproto v0.0.0-20200903010400-9bfcb5116336 // indirect
google.golang.org/grpc v1.48.0 // indirect
golang.org/x/time v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/guregu/null.v3 v3.3.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

840
go.sum

File diff suppressed because it is too large Load diff

153
internal/local/client.go Normal file
View file

@ -0,0 +1,153 @@
package local
import (
"crypto/ecdsa"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type Client struct {
vu modules.VU
key ecdsa.PrivateKey
ng *engine.StorageEngine
}
type PutResponse struct {
Success bool
ObjectID string
Error string
}
type GetResponse struct {
Success bool
Error string
}
type DeleteResponse struct {
Success bool
Error string
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
sz := len(payload.Bytes())
attrs := make([]object.Attribute, len(headers))
{
ind := 0
for k, v := range headers {
attrs[ind].SetKey(k)
attrs[ind].SetValue(v)
ind++
}
}
ownerID := &user.ID{}
user.IDFromKey(ownerID, c.key.PublicKey)
obj := object.New()
obj.SetContainerID(mustParseContainerID(containerID))
obj.SetOwnerID(ownerID) // needed for metabase bucket name
obj.SetAttributes(attrs...)
obj.SetPayload(payload.Bytes())
obj.SetPayloadSize(uint64(len(payload.Bytes())))
object.CalculateAndSetPayloadChecksum(obj) // needed for metabase key
id, err := object.CalculateID(obj)
if err != nil {
return PutResponse{Error: fmt.Sprintf("calculating id: %v", err)}
}
obj.SetID(id)
if err := object.CalculateAndSetSignature(c.key, obj); err != nil {
return PutResponse{Error: fmt.Sprintf("calculating signature: %v", err)}
}
var req engine.PutPrm
req.WithObject(obj)
start := time.Now()
if _, err := c.ng.Put(req); err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{
Success: true,
ObjectID: id.EncodeToString(),
}
}
func (c *Client) Get(containerID, objectID string) GetResponse {
var addr oid.Address
addr.SetContainer(mustParseContainerID(containerID))
addr.SetObject(mustParseObjectID(objectID))
var req engine.GetPrm
req.WithAddress(addr)
start := time.Now()
resp, err := c.ng.Get(req)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
stats.Report(c.vu, objGetTotal, 1)
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(len(resp.Object().Payload())))
return GetResponse{Success: true}
}
func (c *Client) Delete(containerID, objectID string) DeleteResponse {
var addr oid.Address
addr.SetContainer(mustParseContainerID(containerID))
addr.SetObject(mustParseObjectID(objectID))
var req engine.DeletePrm
req.WithAddress(addr)
start := time.Now()
if _, err := c.ng.Delete(req); err != nil {
stats.Report(c.vu, objDeleteFails, 1)
return DeleteResponse{Error: err.Error()}
}
stats.Report(c.vu, objDeleteTotal, 1)
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
return DeleteResponse{Success: true}
}
func mustParseContainerID(strContainerID string) cid.ID {
var containerID cid.ID
err := containerID.DecodeString(strContainerID)
if err != nil {
panic(fmt.Sprintf("parsing container id %q: %v", strContainerID, err))
}
return containerID
}
func mustParseObjectID(strObjectID string) oid.ID {
var cliObjectID oid.ID
err := cliObjectID.DecodeString(strObjectID)
if err != nil {
panic(fmt.Sprintf("parsing object id %q: %v", strObjectID, err))
}
return cliObjectID
}

296
internal/local/local.go Normal file
View file

@ -0,0 +1,296 @@
package local
import (
"errors"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
metabase "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"go.etcd.io/bbolt"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
"go.uber.org/zap"
)
// RootModule is the global module object type. It is instantiated once per test
// run and will be used to create k6/x/frostfs/local module instances for each VU.
type RootModule struct {
mu sync.Mutex
// configFile is the name of the configuration file used during one test.
configFile string
// ng is the engine instance used during one test, corresponding to the configFile. Each VU
fyrchik marked this conversation as resolved Outdated

We do not need to support multiple configuration files here, 1 is enough.
Or do you have any scenario in mind?

We do not need to support multiple configuration files here, 1 is enough. Or do you have any scenario in mind?

done. Turned it into a singleton. (the original reason was that Connect is called multiple times and you can't have more than an instance of the engine running).

done. Turned it into a singleton. (the original reason was that `Connect` is called multiple times and you can't have more than an instance of the engine running).
// gets the same engine instance.
ng *engine.StorageEngine
}
// Local represents an instance of the module for every VU.
type Local struct {
vu modules.VU
resolveEngine func(string) (*engine.StorageEngine, error)
}
// Ensure the interfaces are implemented correctly.
var (
_ modules.Module = &RootModule{}
_ modules.Instance = &Local{}
objPutTotal, objPutFails, objPutDuration *metrics.Metric
objGetTotal, objGetFails, objGetDuration *metrics.Metric
objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric
)
func init() {
modules.Register("k6/x/frostfs/local", &RootModule{})
}
// NewModuleInstance implements the modules.Module interface and returns
// a new instance for each VU.
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
mi := &Local{
vu: vu,
resolveEngine: r.getOrCreateEngine,
}
return mi
}
// getOrCreateEngine returns the current engine instance for the given configuration file,
// creating a new one if none exists. Note that the identity of configuration files is their
// file name for the purposes of test runs.
func (r *RootModule) getOrCreateEngine(configFile string) (*engine.StorageEngine, error) {
r.mu.Lock()
defer r.mu.Unlock()
if len(configFile) == 0 {
return nil, errors.New("configFile cannot be empty")
}
// Create and initialize engine for the given configFile if it doesn't exist already
if r.ng == nil {
r.configFile = configFile
appCfg := config.New(config.Prm{}, config.WithConfigFile(configFile))
ngOpts, shardOpts := storageEngineOptionsFromConfig(appCfg)
r.ng = engine.New(ngOpts...)
for i, opts := range shardOpts {
if _, err := r.ng.AddShard(opts...); err != nil {
return nil, fmt.Errorf("adding shard %d: %v", i, err)
}
}
if err := r.ng.Open(); err != nil {
return nil, fmt.Errorf("opening engine: %v", err)
}
if err := r.ng.Init(); err != nil {
return nil, fmt.Errorf("initializing engine: %v", err)
}
} else if configFile != r.configFile {
return nil, fmt.Errorf("getOrCreateEngine called with mismatching configFile after engine was initialized: got %q, want %q", configFile, r.configFile)
}
return r.ng, nil
}
// Exports implements the modules.Instance interface and returns the exports
// of the JS module.
func (s *Local) Exports() modules.Exports {
return modules.Exports{Default: s}
}
func (s *Local) Connect(configFile, hexKey string) (*Client, error) {
ng, err := s.resolveEngine(configFile)
if err != nil {
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err)
}
key, err := parseOrCreateKey(hexKey)
if err != nil {
return nil, fmt.Errorf("creating key: %v", err)
}
// register metrics
registry := metrics.NewRegistry()
objPutTotal, _ = registry.NewMetric("local_obj_put_total", metrics.Counter)
objPutFails, _ = registry.NewMetric("local_obj_put_fails", metrics.Counter)
objPutDuration, _ = registry.NewMetric("local_obj_put_duration", metrics.Trend, metrics.Time)
objGetTotal, _ = registry.NewMetric("local_obj_get_total", metrics.Counter)
objGetFails, _ = registry.NewMetric("local_obj_get_fails", metrics.Counter)
objGetDuration, _ = registry.NewMetric("local_obj_get_duration", metrics.Trend, metrics.Time)
objDeleteTotal, _ = registry.NewMetric("local_obj_delete_total", metrics.Counter)
objDeleteFails, _ = registry.NewMetric("local_obj_delete_fails", metrics.Counter)
objDeleteDuration, _ = registry.NewMetric("local_obj_delete_duration", metrics.Trend, metrics.Time)
return &Client{
vu: s.vu,
key: key.PrivateKey,
ng: ng,
}, nil
}
type epochState struct{}
func (epochState) CurrentEpoch() uint64 { return 0 }
// storageEngineOptionsFromConfig loads a configuration file and returns the corresponding
// engine and shard options to recreate an engine usable with an existing storage instance.
// This makes sure that the local loader uses the same engine configuration as the one that
// preloaded the storage (if any), by using the same configuration file.
//
// Note that the configuration file only needs to contain the storage-specific sections.
func storageEngineOptionsFromConfig(c *config.Config) ([]engine.Option, [][]shard.Option) {
log := zap.L()
ngOpts := []engine.Option{
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)),
engine.WithLogger(&logger.Logger{Logger: log}),
}
var shOpts [][]shard.Option
engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
opts := []shard.Option{
shard.WithRefillMetabase(sc.RefillMetabase()),
shard.WithMode(sc.Mode()),
shard.WithLogger(&logger.Logger{Logger: log}),
}
// substorages
{
var substorages []blobstor.SubStorage
for _, scfg := range sc.BlobStor().Storages() {
switch scfg.Type() {
case blobovniczatree.Type:
cfg := blobovniczaconfig.From((*config.Config)(scfg))
ss := blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(scfg.Path()),
blobovniczatree.WithPermissions(scfg.Perm()),
blobovniczatree.WithBlobovniczaSize(cfg.Size()),
blobovniczatree.WithBlobovniczaShallowDepth(cfg.ShallowDepth()),
blobovniczatree.WithBlobovniczaShallowWidth(cfg.ShallowWidth()),
blobovniczatree.WithOpenedCacheSize(cfg.OpenedCacheSize()),
blobovniczatree.WithLogger(&logger.Logger{Logger: log}),
),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < sc.SmallSizeLimit()
},
}
substorages = append(substorages, ss)
case fstree.Type:
cfg := fstreeconfig.From((*config.Config)(scfg))
ss := blobstor.SubStorage{
Storage: fstree.New(
fstree.WithPath(scfg.Path()),
fstree.WithPerm(scfg.Perm()),
fstree.WithDepth(cfg.Depth()),
fstree.WithNoSync(cfg.NoSync()),
),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return true
},
}
substorages = append(substorages, ss)
default:
return fmt.Errorf("invalid storage type: %s", scfg.Type())
}
}
opts = append(opts, shard.WithBlobStorOptions(
blobstor.WithCompressObjects(sc.Compress()),
blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()),
blobstor.WithStorages(substorages),
blobstor.WithLogger(&logger.Logger{Logger: log}),
))
}
// write cache
if wc := sc.WriteCache(); wc.Enabled() {
opts = append(opts, shard.WithWriteCacheOptions(
writecache.WithPath(wc.Path()),
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
writecache.WithMaxCacheSize(wc.SizeLimit()),
writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(&logger.Logger{Logger: log}),
))
}
// tree
if config.BoolSafe(c.Sub("tree"), "enabled") {
pr := sc.Pilorama()
opts = append(opts, shard.WithPiloramaOptions(
pilorama.WithPath(pr.Path()),
pilorama.WithPerm(pr.Perm()),
pilorama.WithMaxBatchSize(pr.MaxBatchSize()),
pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()),
pilorama.WithNoSync(pr.NoSync()),
))
}
// metabase
{
mb := sc.Metabase()
opts = append(opts, shard.WithMetaBaseOptions(
metabase.WithPath(mb.Path()),
metabase.WithPermissions(mb.BoltDB().Perm()),
metabase.WithMaxBatchSize(mb.BoltDB().MaxBatchSize()),
metabase.WithMaxBatchDelay(mb.BoltDB().MaxBatchDelay()),
metabase.WithBoltDBOptions(&bbolt.Options{
Timeout: 1 * time.Second,
}),
metabase.WithEpochState(epochState{}),
metabase.WithLogger(&logger.Logger{Logger: log}),
))
}
// GC
{
gc := sc.GC()
opts = append(opts,
shard.WithGCRemoverSleepInterval(gc.RemoverSleepInterval()),
shard.WithRemoverBatchSize(gc.RemoverBatchSize()),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
if err != nil {
panic(err)
}
return pool
}),
)
}
shOpts = append(shOpts, opts)
return nil
})
return ngOpts, shOpts
}
func parseOrCreateKey(hexKeyStr string) (*keys.PrivateKey, error) {
if hexKeyStr != "" {
return keys.NewPrivateKeyFromHex(hexKeyStr)
}
return keys.NewPrivateKey()
}

View file

@ -0,0 +1,24 @@
# This configuration can be used for the local scenario when testing locally.
storage:
shard_num: 1
shard:
0:
metabase:
path: /tmp/k6_local/metabase
perm: 0600
blobstor:
- path: /tmp/k6_local/blobovnicza
type: blobovnicza
perm: 0600
opened_cache_capacity: 32
depth: 1
width: 1
- path: /tmp/k6_local/fstree
type: fstree
perm: 0600
depth: 4
writecache:
enabled: false
gc:
remover_batch_size: 100
remover_sleep_interval: 1m

160
scenarios/local.js Normal file
View file

@ -0,0 +1,160 @@
import datagen from 'k6/x/frostfs/datagen';
import local from 'k6/x/frostfs/local';
import logging from 'k6/x/frostfs/logging';
import registry from 'k6/x/frostfs/registry';
import { SharedArray } from 'k6/data';
import { textSummary } from './libs/k6-summary-0.0.2.js';
const obj_list = new SharedArray('obj_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
});
const container_list = new SharedArray('container_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).containers;
});
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const config_file = __ENV.CONFIG_FILE;
const local_client = local.connect(config_file, '');
const log = logging.new().withField("config", config_file);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined;
fyrchik marked this conversation as resolved Outdated

Current main usecase is PUT/GET, can we delete DELETE-related stuff for now?

Current main usecase is PUT/GET, can we delete DELETE-related stuff for now?

I see. But it already works :) no harm in keeping it?

I see. But it already works :) no harm in keeping it?
if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_delete",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: delete_age,
}
);
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0');
if (write_vu_count > 0) {
scenarios.write = {
executor: 'constant-vus',
vus: write_vu_count,
duration: `${duration}s`,
exec: 'obj_write',
gracefulStop: '5s',
};
}
const read_vu_count = parseInt(__ENV.READERS || '0');
if (read_vu_count > 0) {
scenarios.read = {
executor: 'constant-vus',
vus: read_vu_count,
duration: `${duration}s`,
exec: 'obj_read',
gracefulStop: '5s',
};
}
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) {
if (!obj_to_delete_selector) {
throw new Error('Positive DELETE worker number without a proper object selector');
}
scenarios.delete = {
executor: 'constant-vus',
vus: delete_vu_count,
duration: `${duration}s`,
exec: 'obj_delete',
gracefulStop: '5s',
};
}
export const options = {
scenarios,
setupTimeout: '5s',
};
export function setup() {
const total_vu_count = write_vu_count + read_vu_count + delete_vu_count;
console.log(`Pregenerated containers: ${container_list.length}`);
console.log(`Pregenerated read object size: ${read_size}`);
console.log(`Pregenerated total objects: ${obj_list.length}`);
console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, { indent: ' ', enableColors: false }),
[summary_json]: JSON.stringify(data),
};
}
export function obj_write() {
const headers = {
unique_header: uuidv4()
fyrchik marked this conversation as resolved Outdated

We need SLEEP_* variables for network load, not to overload a system.
It is not applicable here (we have no timeout for operations) and is deprecated anyway.
Can we remove this?

We need `SLEEP_*` variables for network load, not to overload a system. It is not applicable here (we have no timeout for operations) and is deprecated anyway. Can we remove this?

done

done
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const resp = local_client.put(container, headers, payload);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
return;
}
if (obj_registry) {
obj_registry.addObject(container, resp.object_id, "", "", hash);
}
}
export function obj_read() {
fyrchik marked this conversation as resolved Outdated

@anikeev-yadro , do we need registry for local load? The only case is to have GET on a loaded cluster, but then we need also to support merging registries from different nodes (or do we do local load on 1 node only?)

@anikeev-yadro , do we need registry for local load? The only case is to have GET on a loaded cluster, but then we need also to support merging registries from different nodes (or do we do local load on 1 node only?)

@fyrchik I think we need to keep the registry. We will think about merge in future.

@fyrchik I think we need to keep the registry. We will think about merge in future.

The registry is used also for e.g. delete workload when there are no pregenerated objects. So probably worth keeping it at least for local tests.

The registry is used also for e.g. delete workload when there are no pregenerated objects. So probably worth keeping it at least for local tests.
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = local_client.get(obj.container, obj.object)
if (!resp.success) {
log.withFields({cid: obj.container, oid: obj.object}).error(resp.error);
}
}
export function obj_delete() {
const obj = obj_to_delete_selector.nextObject();
if (!obj) {
return;
}
const resp = local_client.delete(obj.c_id, obj.o_id);
if (!resp.success) {
// Log errors except (2052 - object already deleted)
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
return;
}
obj_registry.deleteObject(obj.id);
}
export function uuidv4() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}

View file

@ -8,9 +8,9 @@
$ ./k6 run -e ENV_FILE=.env some-scenario.js
```
## Common options for gRPC, HTTP, S3 scenarios:
## Common options for gRPC, local, HTTP, S3 scenarios:
Scenarios `grpc.js`, `http.js` and `s3.js` support the following options:
Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following options:
* `DURATION` - duration of scenario in seconds.
* `READERS` - number of VUs performing read operations.
* `WRITERS` - number of VUs performing write operations.
@ -48,6 +48,26 @@ Options (in addition to the common options):
* `DIAL_TIMEOUT` - timeout to connect to a node (in seconds).
* `STREAM_TIMEOUT` - timeout for a single stream message for `PUT`/`GET` operations (in seconds).
## Local
1. Create pre-generated containers or objects:
The tests will use all pre-created containers for PUT operations and all pre-created objects for READ operations. There is no dedicated script to preset HTTP scenario, so we use the same script as for gRPC:
```shell
$ ./scenarios/preset/preset_grpc.py --size 1024 --containers 1 --out grpc.json --endpoint host1:8080 --preload_obj 500
```
2. Execute scenario with options:
```shell
$ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e DELETERS=30 -e DELETE_AGE=10 -e REGISTRY_FILE=registry.bolt -e CONFIG_FILE=/path/to/config.yaml -e PREGEN_JSON=./grpc.json scenarios/local.js
```
Options (in addition to the common options):
* `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section is used.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
## HTTP
1. Create pre-generated containers or objects: