[#14]: Add local target support #35
19
README.md
|
@ -69,6 +69,25 @@ const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
|||
It returns dictionary with `success` boolean flag, `object_id` string and
|
||||
`error` string.
|
||||
|
||||
## Local
|
||||
|
||||
Create a local client with `connect` method. Arguments:
|
||||
- local path to frostfs storage node configuration file
|
||||
- hex encoded private key (empty value produces random key)
|
||||
|
||||
```js
|
||||
import local from 'k6/x/frostfs/local';
|
||||
const local_client = local.connect("/path/to/config.yaml", "")
|
||||
```
|
||||
|
||||
### Methods
|
||||
- `put(container_id, headers, payload)`. Returns dictionary with `success`
|
||||
boolean flag, `object_id` string, and `error` string.
|
||||
- `get(container_id, object_id)`. Returns dictionary with `success` boolean
|
||||
flag, and `error` string.
|
||||
- `delete(container_id, object_id)`. Returns dictionary with `success` boolean
|
||||
flag, and `error` string.
|
||||
|
||||
## S3
|
||||
|
||||
Create s3 client with `connect` method. Arguments:
|
||||
|
|
24
examples/local.js
Normal file
|
@ -0,0 +1,24 @@
|
|||
import {uuidv4} from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
import local from 'k6/x/frostfs/local';
|
||||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const local_cli = local.connect("/path/to/config.yaml", "")
|
||||
|
||||
export const options = {
|
||||
stages: [
|
||||
{duration: '30s', target: 10},
|
||||
],
|
||||
};
|
||||
|
||||
export default function () {
|
||||
let headers = {
|
||||
'unique_header': uuidv4()
|
||||
}
|
||||
const container_id = '6BVPPXQewRJ6J5EYmAPLczXxNocS7ikyF7amS2esWQnb';
|
||||
let resp = local_cli.put(container_id, headers, payload)
|
||||
if (resp.success) {
|
||||
local_cli.get(container_id, resp.object_id)
|
||||
} else {
|
||||
console.log(resp.error)
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package xk6_frostfs
|
|||
import (
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/env"
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/logging"
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/native"
|
||||
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
|
||||
|
|
31
go.mod
|
@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/xk6-frostfs
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230313113918-4e244686cf03
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230307124721-94476f905599
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.16.3
|
||||
|
@ -12,10 +13,12 @@ require (
|
|||
github.com/google/uuid v1.3.0
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/nspcc-dev/neo-go v0.100.1
|
||||
github.com/panjf2000/ants/v2 v2.4.0
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/stretchr/testify v1.8.1
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.k6.io/k6 v0.38.2
|
||||
go.uber.org/zap v1.24.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -42,30 +45,46 @@ require (
|
|||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
|
||||
github.com/fatih/color v1.13.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.15.13 // indirect
|
||||
github.com/magiconair/properties v1.8.6 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||
github.com/mattn/go-isatty v0.0.14 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
|
||||
github.com/pelletier/go-toml v1.9.5 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.1.2 // indirect
|
||||
github.com/spf13/afero v1.9.2 // indirect
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/spf13/viper v1.14.0 // indirect
|
||||
github.com/subosito/gotenv v1.4.1 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.4.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20221227203929-1b447090c38c // indirect
|
||||
golang.org/x/net v0.3.0 // indirect
|
||||
golang.org/x/net v0.4.0 // indirect
|
||||
golang.org/x/sys v0.3.0 // indirect
|
||||
golang.org/x/text v0.5.0 // indirect
|
||||
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200903010400-9bfcb5116336 // indirect
|
||||
google.golang.org/grpc v1.48.0 // indirect
|
||||
golang.org/x/time v0.1.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
|
||||
google.golang.org/grpc v1.51.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
gopkg.in/guregu/null.v3 v3.3.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
153
internal/local/client.go
Normal file
|
@ -0,0 +1,153 @@
|
|||
package local
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||
"github.com/dop251/goja"
|
||||
"go.k6.io/k6/js/modules"
|
||||
"go.k6.io/k6/metrics"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
ng *engine.StorageEngine
|
||||
}
|
||||
|
||||
type PutResponse struct {
|
||||
Success bool
|
||||
ObjectID string
|
||||
Error string
|
||||
}
|
||||
|
||||
type GetResponse struct {
|
||||
Success bool
|
||||
Error string
|
||||
}
|
||||
|
||||
type DeleteResponse struct {
|
||||
Success bool
|
||||
Error string
|
||||
}
|
||||
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
|
||||
sz := len(payload.Bytes())
|
||||
|
||||
attrs := make([]object.Attribute, len(headers))
|
||||
{
|
||||
ind := 0
|
||||
for k, v := range headers {
|
||||
attrs[ind].SetKey(k)
|
||||
attrs[ind].SetValue(v)
|
||||
ind++
|
||||
}
|
||||
}
|
||||
|
||||
ownerID := &user.ID{}
|
||||
user.IDFromKey(ownerID, c.key.PublicKey)
|
||||
|
||||
obj := object.New()
|
||||
obj.SetContainerID(mustParseContainerID(containerID))
|
||||
obj.SetOwnerID(ownerID) // needed for metabase bucket name
|
||||
obj.SetAttributes(attrs...)
|
||||
obj.SetPayload(payload.Bytes())
|
||||
obj.SetPayloadSize(uint64(len(payload.Bytes())))
|
||||
object.CalculateAndSetPayloadChecksum(obj) // needed for metabase key
|
||||
|
||||
id, err := object.CalculateID(obj)
|
||||
if err != nil {
|
||||
return PutResponse{Error: fmt.Sprintf("calculating id: %v", err)}
|
||||
}
|
||||
obj.SetID(id)
|
||||
|
||||
if err := object.CalculateAndSetSignature(c.key, obj); err != nil {
|
||||
return PutResponse{Error: fmt.Sprintf("calculating signature: %v", err)}
|
||||
}
|
||||
|
||||
var req engine.PutPrm
|
||||
req.WithObject(obj)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if _, err := c.ng.Put(req); err != nil {
|
||||
stats.Report(c.vu, objPutFails, 1)
|
||||
return PutResponse{Error: err.Error()}
|
||||
}
|
||||
|
||||
stats.Report(c.vu, objPutTotal, 1)
|
||||
stats.ReportDataSent(c.vu, float64(sz))
|
||||
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
||||
|
||||
return PutResponse{
|
||||
Success: true,
|
||||
ObjectID: id.EncodeToString(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Get(containerID, objectID string) GetResponse {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(mustParseContainerID(containerID))
|
||||
addr.SetObject(mustParseObjectID(objectID))
|
||||
|
||||
var req engine.GetPrm
|
||||
req.WithAddress(addr)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := c.ng.Get(req)
|
||||
if err != nil {
|
||||
stats.Report(c.vu, objGetFails, 1)
|
||||
return GetResponse{Error: err.Error()}
|
||||
}
|
||||
|
||||
stats.Report(c.vu, objGetTotal, 1)
|
||||
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
||||
stats.ReportDataReceived(c.vu, float64(len(resp.Object().Payload())))
|
||||
|
||||
return GetResponse{Success: true}
|
||||
}
|
||||
|
||||
func (c *Client) Delete(containerID, objectID string) DeleteResponse {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(mustParseContainerID(containerID))
|
||||
addr.SetObject(mustParseObjectID(objectID))
|
||||
|
||||
var req engine.DeletePrm
|
||||
req.WithAddress(addr)
|
||||
|
||||
start := time.Now()
|
||||
if _, err := c.ng.Delete(req); err != nil {
|
||||
stats.Report(c.vu, objDeleteFails, 1)
|
||||
return DeleteResponse{Error: err.Error()}
|
||||
}
|
||||
|
||||
stats.Report(c.vu, objDeleteTotal, 1)
|
||||
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
||||
|
||||
return DeleteResponse{Success: true}
|
||||
}
|
||||
|
||||
func mustParseContainerID(strContainerID string) cid.ID {
|
||||
var containerID cid.ID
|
||||
err := containerID.DecodeString(strContainerID)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("parsing container id %q: %v", strContainerID, err))
|
||||
}
|
||||
return containerID
|
||||
}
|
||||
|
||||
func mustParseObjectID(strObjectID string) oid.ID {
|
||||
var cliObjectID oid.ID
|
||||
err := cliObjectID.DecodeString(strObjectID)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("parsing object id %q: %v", strObjectID, err))
|
||||
}
|
||||
return cliObjectID
|
||||
}
|
296
internal/local/local.go
Normal file
|
@ -0,0 +1,296 @@
|
|||
package local
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
metabase "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.k6.io/k6/js/modules"
|
||||
"go.k6.io/k6/metrics"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RootModule is the global module object type. It is instantiated once per test
|
||||
// run and will be used to create k6/x/frostfs/local module instances for each VU.
|
||||
type RootModule struct {
|
||||
mu sync.Mutex
|
||||
// configFile is the name of the configuration file used during one test.
|
||||
configFile string
|
||||
// ng is the engine instance used during one test, corresponding to the configFile. Each VU
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We do not need to support multiple configuration files here, 1 is enough. We do not need to support multiple configuration files here, 1 is enough.
Or do you have any scenario in mind?
ale64bit
commented
done. Turned it into a singleton. (the original reason was that done. Turned it into a singleton. (the original reason was that `Connect` is called multiple times and you can't have more than an instance of the engine running).
|
||||
// gets the same engine instance.
|
||||
ng *engine.StorageEngine
|
||||
}
|
||||
|
||||
// Local represents an instance of the module for every VU.
|
||||
type Local struct {
|
||||
vu modules.VU
|
||||
resolveEngine func(string) (*engine.StorageEngine, error)
|
||||
}
|
||||
|
||||
// Ensure the interfaces are implemented correctly.
|
||||
var (
|
||||
_ modules.Module = &RootModule{}
|
||||
_ modules.Instance = &Local{}
|
||||
|
||||
objPutTotal, objPutFails, objPutDuration *metrics.Metric
|
||||
objGetTotal, objGetFails, objGetDuration *metrics.Metric
|
||||
objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric
|
||||
)
|
||||
|
||||
func init() {
|
||||
modules.Register("k6/x/frostfs/local", &RootModule{})
|
||||
}
|
||||
|
||||
// NewModuleInstance implements the modules.Module interface and returns
|
||||
// a new instance for each VU.
|
||||
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
|
||||
mi := &Local{
|
||||
vu: vu,
|
||||
resolveEngine: r.getOrCreateEngine,
|
||||
}
|
||||
return mi
|
||||
}
|
||||
|
||||
// getOrCreateEngine returns the current engine instance for the given configuration file,
|
||||
// creating a new one if none exists. Note that the identity of configuration files is their
|
||||
// file name for the purposes of test runs.
|
||||
func (r *RootModule) getOrCreateEngine(configFile string) (*engine.StorageEngine, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if len(configFile) == 0 {
|
||||
return nil, errors.New("configFile cannot be empty")
|
||||
}
|
||||
|
||||
// Create and initialize engine for the given configFile if it doesn't exist already
|
||||
if r.ng == nil {
|
||||
r.configFile = configFile
|
||||
appCfg := config.New(config.Prm{}, config.WithConfigFile(configFile))
|
||||
ngOpts, shardOpts := storageEngineOptionsFromConfig(appCfg)
|
||||
r.ng = engine.New(ngOpts...)
|
||||
for i, opts := range shardOpts {
|
||||
if _, err := r.ng.AddShard(opts...); err != nil {
|
||||
return nil, fmt.Errorf("adding shard %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
if err := r.ng.Open(); err != nil {
|
||||
return nil, fmt.Errorf("opening engine: %v", err)
|
||||
}
|
||||
if err := r.ng.Init(); err != nil {
|
||||
return nil, fmt.Errorf("initializing engine: %v", err)
|
||||
}
|
||||
} else if configFile != r.configFile {
|
||||
return nil, fmt.Errorf("getOrCreateEngine called with mismatching configFile after engine was initialized: got %q, want %q", configFile, r.configFile)
|
||||
}
|
||||
|
||||
return r.ng, nil
|
||||
}
|
||||
|
||||
// Exports implements the modules.Instance interface and returns the exports
|
||||
// of the JS module.
|
||||
func (s *Local) Exports() modules.Exports {
|
||||
return modules.Exports{Default: s}
|
||||
}
|
||||
|
||||
func (s *Local) Connect(configFile, hexKey string) (*Client, error) {
|
||||
ng, err := s.resolveEngine(configFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err)
|
||||
}
|
||||
|
||||
key, err := parseOrCreateKey(hexKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating key: %v", err)
|
||||
}
|
||||
|
||||
// register metrics
|
||||
registry := metrics.NewRegistry()
|
||||
objPutTotal, _ = registry.NewMetric("local_obj_put_total", metrics.Counter)
|
||||
objPutFails, _ = registry.NewMetric("local_obj_put_fails", metrics.Counter)
|
||||
objPutDuration, _ = registry.NewMetric("local_obj_put_duration", metrics.Trend, metrics.Time)
|
||||
|
||||
objGetTotal, _ = registry.NewMetric("local_obj_get_total", metrics.Counter)
|
||||
objGetFails, _ = registry.NewMetric("local_obj_get_fails", metrics.Counter)
|
||||
objGetDuration, _ = registry.NewMetric("local_obj_get_duration", metrics.Trend, metrics.Time)
|
||||
|
||||
objDeleteTotal, _ = registry.NewMetric("local_obj_delete_total", metrics.Counter)
|
||||
objDeleteFails, _ = registry.NewMetric("local_obj_delete_fails", metrics.Counter)
|
||||
objDeleteDuration, _ = registry.NewMetric("local_obj_delete_duration", metrics.Trend, metrics.Time)
|
||||
|
||||
return &Client{
|
||||
vu: s.vu,
|
||||
key: key.PrivateKey,
|
||||
ng: ng,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type epochState struct{}
|
||||
|
||||
func (epochState) CurrentEpoch() uint64 { return 0 }
|
||||
|
||||
// storageEngineOptionsFromConfig loads a configuration file and returns the corresponding
|
||||
// engine and shard options to recreate an engine usable with an existing storage instance.
|
||||
// This makes sure that the local loader uses the same engine configuration as the one that
|
||||
// preloaded the storage (if any), by using the same configuration file.
|
||||
//
|
||||
// Note that the configuration file only needs to contain the storage-specific sections.
|
||||
func storageEngineOptionsFromConfig(c *config.Config) ([]engine.Option, [][]shard.Option) {
|
||||
log := zap.L()
|
||||
|
||||
ngOpts := []engine.Option{
|
||||
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
|
||||
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)),
|
||||
engine.WithLogger(&logger.Logger{Logger: log}),
|
||||
}
|
||||
|
||||
var shOpts [][]shard.Option
|
||||
|
||||
engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
||||
opts := []shard.Option{
|
||||
shard.WithRefillMetabase(sc.RefillMetabase()),
|
||||
shard.WithMode(sc.Mode()),
|
||||
shard.WithLogger(&logger.Logger{Logger: log}),
|
||||
}
|
||||
|
||||
// substorages
|
||||
{
|
||||
var substorages []blobstor.SubStorage
|
||||
for _, scfg := range sc.BlobStor().Storages() {
|
||||
switch scfg.Type() {
|
||||
case blobovniczatree.Type:
|
||||
cfg := blobovniczaconfig.From((*config.Config)(scfg))
|
||||
ss := blobstor.SubStorage{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(scfg.Path()),
|
||||
blobovniczatree.WithPermissions(scfg.Perm()),
|
||||
blobovniczatree.WithBlobovniczaSize(cfg.Size()),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(cfg.ShallowDepth()),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(cfg.ShallowWidth()),
|
||||
blobovniczatree.WithOpenedCacheSize(cfg.OpenedCacheSize()),
|
||||
blobovniczatree.WithLogger(&logger.Logger{Logger: log}),
|
||||
),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < sc.SmallSizeLimit()
|
||||
},
|
||||
}
|
||||
substorages = append(substorages, ss)
|
||||
case fstree.Type:
|
||||
cfg := fstreeconfig.From((*config.Config)(scfg))
|
||||
ss := blobstor.SubStorage{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(scfg.Path()),
|
||||
fstree.WithPerm(scfg.Perm()),
|
||||
fstree.WithDepth(cfg.Depth()),
|
||||
fstree.WithNoSync(cfg.NoSync()),
|
||||
),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
substorages = append(substorages, ss)
|
||||
default:
|
||||
return fmt.Errorf("invalid storage type: %s", scfg.Type())
|
||||
}
|
||||
}
|
||||
opts = append(opts, shard.WithBlobStorOptions(
|
||||
blobstor.WithCompressObjects(sc.Compress()),
|
||||
blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()),
|
||||
blobstor.WithStorages(substorages),
|
||||
blobstor.WithLogger(&logger.Logger{Logger: log}),
|
||||
))
|
||||
}
|
||||
|
||||
// write cache
|
||||
if wc := sc.WriteCache(); wc.Enabled() {
|
||||
opts = append(opts, shard.WithWriteCacheOptions(
|
||||
writecache.WithPath(wc.Path()),
|
||||
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
|
||||
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
|
||||
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
|
||||
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
|
||||
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
|
||||
writecache.WithMaxCacheSize(wc.SizeLimit()),
|
||||
writecache.WithNoSync(wc.NoSync()),
|
||||
writecache.WithLogger(&logger.Logger{Logger: log}),
|
||||
))
|
||||
}
|
||||
|
||||
// tree
|
||||
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
||||
pr := sc.Pilorama()
|
||||
opts = append(opts, shard.WithPiloramaOptions(
|
||||
pilorama.WithPath(pr.Path()),
|
||||
pilorama.WithPerm(pr.Perm()),
|
||||
pilorama.WithMaxBatchSize(pr.MaxBatchSize()),
|
||||
pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()),
|
||||
pilorama.WithNoSync(pr.NoSync()),
|
||||
))
|
||||
}
|
||||
|
||||
// metabase
|
||||
{
|
||||
mb := sc.Metabase()
|
||||
opts = append(opts, shard.WithMetaBaseOptions(
|
||||
metabase.WithPath(mb.Path()),
|
||||
metabase.WithPermissions(mb.BoltDB().Perm()),
|
||||
metabase.WithMaxBatchSize(mb.BoltDB().MaxBatchSize()),
|
||||
metabase.WithMaxBatchDelay(mb.BoltDB().MaxBatchDelay()),
|
||||
metabase.WithBoltDBOptions(&bbolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
}),
|
||||
metabase.WithEpochState(epochState{}),
|
||||
metabase.WithLogger(&logger.Logger{Logger: log}),
|
||||
))
|
||||
}
|
||||
|
||||
// GC
|
||||
{
|
||||
gc := sc.GC()
|
||||
opts = append(opts,
|
||||
shard.WithGCRemoverSleepInterval(gc.RemoverSleepInterval()),
|
||||
shard.WithRemoverBatchSize(gc.RemoverBatchSize()),
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return pool
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
shOpts = append(shOpts, opts)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return ngOpts, shOpts
|
||||
}
|
||||
|
||||
func parseOrCreateKey(hexKeyStr string) (*keys.PrivateKey, error) {
|
||||
if hexKeyStr != "" {
|
||||
return keys.NewPrivateKeyFromHex(hexKeyStr)
|
||||
}
|
||||
return keys.NewPrivateKey()
|
||||
}
|
24
scenarios/files/local_storage.yaml
Normal file
|
@ -0,0 +1,24 @@
|
|||
# This configuration can be used for the local scenario when testing locally.
|
||||
storage:
|
||||
shard_num: 1
|
||||
shard:
|
||||
0:
|
||||
metabase:
|
||||
path: /tmp/k6_local/metabase
|
||||
perm: 0600
|
||||
blobstor:
|
||||
- path: /tmp/k6_local/blobovnicza
|
||||
type: blobovnicza
|
||||
perm: 0600
|
||||
opened_cache_capacity: 32
|
||||
depth: 1
|
||||
width: 1
|
||||
- path: /tmp/k6_local/fstree
|
||||
type: fstree
|
||||
perm: 0600
|
||||
depth: 4
|
||||
writecache:
|
||||
enabled: false
|
||||
gc:
|
||||
remover_batch_size: 100
|
||||
remover_sleep_interval: 1m
|
160
scenarios/local.js
Normal file
|
@ -0,0 +1,160 @@
|
|||
import datagen from 'k6/x/frostfs/datagen';
|
||||
import local from 'k6/x/frostfs/local';
|
||||
import logging from 'k6/x/frostfs/logging';
|
||||
import registry from 'k6/x/frostfs/registry';
|
||||
import { SharedArray } from 'k6/data';
|
||||
import { textSummary } from './libs/k6-summary-0.0.2.js';
|
||||
|
||||
const obj_list = new SharedArray('obj_list', function () {
|
||||
return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
|
||||
});
|
||||
|
||||
const container_list = new SharedArray('container_list', function () {
|
||||
return JSON.parse(open(__ENV.PREGEN_JSON)).containers;
|
||||
});
|
||||
|
||||
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
||||
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
||||
|
||||
const config_file = __ENV.CONFIG_FILE;
|
||||
const local_client = local.connect(config_file, '');
|
||||
const log = logging.new().withField("config", config_file);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||
|
||||
const duration = __ENV.DURATION;
|
||||
|
||||
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
|
||||
let obj_to_delete_selector = undefined;
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Current main usecase is PUT/GET, can we delete DELETE-related stuff for now? Current main usecase is PUT/GET, can we delete DELETE-related stuff for now?
ale64bit
commented
I see. But it already works :) no harm in keeping it? I see. But it already works :) no harm in keeping it?
|
||||
if (registry_enabled && delete_age) {
|
||||
obj_to_delete_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE,
|
||||
"obj_to_delete",
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||
{
|
||||
status: "created",
|
||||
age: delete_age,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
|
||||
const scenarios = {};
|
||||
|
||||
const write_vu_count = parseInt(__ENV.WRITERS || '0');
|
||||
if (write_vu_count > 0) {
|
||||
scenarios.write = {
|
||||
executor: 'constant-vus',
|
||||
vus: write_vu_count,
|
||||
duration: `${duration}s`,
|
||||
exec: 'obj_write',
|
||||
gracefulStop: '5s',
|
||||
};
|
||||
}
|
||||
|
||||
const read_vu_count = parseInt(__ENV.READERS || '0');
|
||||
if (read_vu_count > 0) {
|
||||
scenarios.read = {
|
||||
executor: 'constant-vus',
|
||||
vus: read_vu_count,
|
||||
duration: `${duration}s`,
|
||||
exec: 'obj_read',
|
||||
gracefulStop: '5s',
|
||||
};
|
||||
}
|
||||
|
||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||
if (delete_vu_count > 0) {
|
||||
if (!obj_to_delete_selector) {
|
||||
throw new Error('Positive DELETE worker number without a proper object selector');
|
||||
}
|
||||
|
||||
scenarios.delete = {
|
||||
executor: 'constant-vus',
|
||||
vus: delete_vu_count,
|
||||
duration: `${duration}s`,
|
||||
exec: 'obj_delete',
|
||||
gracefulStop: '5s',
|
||||
};
|
||||
}
|
||||
|
||||
export const options = {
|
||||
scenarios,
|
||||
setupTimeout: '5s',
|
||||
};
|
||||
|
||||
export function setup() {
|
||||
const total_vu_count = write_vu_count + read_vu_count + delete_vu_count;
|
||||
|
||||
console.log(`Pregenerated containers: ${container_list.length}`);
|
||||
console.log(`Pregenerated read object size: ${read_size}`);
|
||||
console.log(`Pregenerated total objects: ${obj_list.length}`);
|
||||
console.log(`Reading VUs: ${read_vu_count}`);
|
||||
console.log(`Writing VUs: ${write_vu_count}`);
|
||||
console.log(`Deleting VUs: ${delete_vu_count}`);
|
||||
console.log(`Total VUs: ${total_vu_count}`);
|
||||
}
|
||||
|
||||
export function teardown(data) {
|
||||
if (obj_registry) {
|
||||
obj_registry.close();
|
||||
}
|
||||
}
|
||||
|
||||
export function handleSummary(data) {
|
||||
return {
|
||||
'stdout': textSummary(data, { indent: ' ', enableColors: false }),
|
||||
[summary_json]: JSON.stringify(data),
|
||||
};
|
||||
}
|
||||
|
||||
export function obj_write() {
|
||||
const headers = {
|
||||
unique_header: uuidv4()
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We need We need `SLEEP_*` variables for network load, not to overload a system.
It is not applicable here (we have no timeout for operations) and is deprecated anyway.
Can we remove this?
ale64bit
commented
done done
|
||||
};
|
||||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const resp = local_client.put(container, headers, payload);
|
||||
if (!resp.success) {
|
||||
log.withField("cid", container).error(resp.error);
|
||||
return;
|
||||
}
|
||||
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
||||
}
|
||||
}
|
||||
|
||||
export function obj_read() {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
@anikeev-yadro , do we need registry for local load? The only case is to have GET on a loaded cluster, but then we need also to support merging registries from different nodes (or do we do local load on 1 node only?) @anikeev-yadro , do we need registry for local load? The only case is to have GET on a loaded cluster, but then we need also to support merging registries from different nodes (or do we do local load on 1 node only?)
anikeev-yadro
commented
@fyrchik I think we need to keep the registry. We will think about merge in future. @fyrchik I think we need to keep the registry. We will think about merge in future.
ale64bit
commented
The registry is used also for e.g. delete workload when there are no pregenerated objects. So probably worth keeping it at least for local tests. The registry is used also for e.g. delete workload when there are no pregenerated objects. So probably worth keeping it at least for local tests.
|
||||
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
|
||||
const resp = local_client.get(obj.container, obj.object)
|
||||
if (!resp.success) {
|
||||
log.withFields({cid: obj.container, oid: obj.object}).error(resp.error);
|
||||
}
|
||||
}
|
||||
|
||||
export function obj_delete() {
|
||||
const obj = obj_to_delete_selector.nextObject();
|
||||
if (!obj) {
|
||||
return;
|
||||
}
|
||||
|
||||
const resp = local_client.delete(obj.c_id, obj.o_id);
|
||||
if (!resp.success) {
|
||||
// Log errors except (2052 - object already deleted)
|
||||
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
|
||||
return;
|
||||
}
|
||||
|
||||
obj_registry.deleteObject(obj.id);
|
||||
}
|
||||
|
||||
export function uuidv4() {
|
||||
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
|
||||
let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
|
||||
return v.toString(16);
|
||||
});
|
||||
}
|
|
@ -8,9 +8,9 @@
|
|||
$ ./k6 run -e ENV_FILE=.env some-scenario.js
|
||||
```
|
||||
|
||||
## Common options for gRPC, HTTP, S3 scenarios:
|
||||
## Common options for gRPC, local, HTTP, S3 scenarios:
|
||||
|
||||
Scenarios `grpc.js`, `http.js` and `s3.js` support the following options:
|
||||
Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following options:
|
||||
* `DURATION` - duration of scenario in seconds.
|
||||
* `READERS` - number of VUs performing read operations.
|
||||
* `WRITERS` - number of VUs performing write operations.
|
||||
|
@ -48,6 +48,26 @@ Options (in addition to the common options):
|
|||
* `DIAL_TIMEOUT` - timeout to connect to a node (in seconds).
|
||||
* `STREAM_TIMEOUT` - timeout for a single stream message for `PUT`/`GET` operations (in seconds).
|
||||
|
||||
## Local
|
||||
|
||||
1. Create pre-generated containers or objects:
|
||||
|
||||
The tests will use all pre-created containers for PUT operations and all pre-created objects for READ operations. There is no dedicated script to preset HTTP scenario, so we use the same script as for gRPC:
|
||||
```shell
|
||||
$ ./scenarios/preset/preset_grpc.py --size 1024 --containers 1 --out grpc.json --endpoint host1:8080 --preload_obj 500
|
||||
```
|
||||
|
||||
2. Execute scenario with options:
|
||||
|
||||
```shell
|
||||
$ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e DELETERS=30 -e DELETE_AGE=10 -e REGISTRY_FILE=registry.bolt -e CONFIG_FILE=/path/to/config.yaml -e PREGEN_JSON=./grpc.json scenarios/local.js
|
||||
```
|
||||
|
||||
Options (in addition to the common options):
|
||||
* `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section is used.
|
||||
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
|
||||
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
|
||||
|
||||
## HTTP
|
||||
|
||||
1. Create pre-generated containers or objects:
|
||||
|
|
Can we clone it? See #19 for
scenarios/libs
.Also it feels
k6
-related, can't we import it withouthttps
?Possibly, but this is what all examples were doing so I copy-pasted it blindly. Let's refactor it in a separate PR since it touches a bunch of other files.