Dmitrii Stepanov
e9edca3e79
Add `data` metrics to measure payload rate. Rename `total` metrics to `success`, because these metrics count success operations count but not total operations count. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
393 lines
14 KiB
Go
393 lines
14 KiB
Go
package local
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
|
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
metabase "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
writecache "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.etcd.io/bbolt"
|
|
"go.k6.io/k6/js/modules"
|
|
"go.k6.io/k6/metrics"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
// RootModule is the global module object type. It is instantiated once per test
|
|
// run and will be used to create k6/x/frostfs/local module instances for each VU.
|
|
type RootModule struct {
|
|
mu sync.Mutex
|
|
// configFile is the name of the configuration file used during one test.
|
|
configFile string
|
|
// configDir is the name of the configuration directory used during one test.
|
|
configDir string
|
|
// ng is the engine instance used during one test, corresponding to the configFile. Each VU
|
|
// gets the same engine instance.
|
|
ng *engine.StorageEngine
|
|
l Limiter
|
|
}
|
|
|
|
// Local represents an instance of the module for every VU.
|
|
type Local struct {
|
|
vu modules.VU
|
|
ResolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)
|
|
}
|
|
|
|
// Ensure the interfaces are implemented correctly.
|
|
var (
|
|
_ modules.Module = &RootModule{}
|
|
_ modules.Instance = &Local{}
|
|
|
|
objPutSuccess, objPutFails, objPutDuration, objPutData *metrics.Metric
|
|
objGetSuccess, objGetFails, objGetDuration, objGetData *metrics.Metric
|
|
objDeleteSuccess, objDeleteFails, objDeleteDuration *metrics.Metric
|
|
)
|
|
|
|
func init() {
|
|
modules.Register("k6/x/frostfs/local", &RootModule{})
|
|
}
|
|
|
|
// NewModuleInstance implements the modules.Module interface and returns
|
|
// a new instance for each VU.
|
|
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
|
|
return NewLocalModuleInstance(vu, r.GetOrCreateEngine)
|
|
}
|
|
|
|
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)) *Local {
|
|
return &Local{
|
|
vu: vu,
|
|
ResolveEngine: resolveEngine,
|
|
}
|
|
}
|
|
|
|
// checkResourceLimits checks the current limit on NOFILE.
|
|
//
|
|
// The usual default is around 1024 and this is too low for production clusters where a value of
|
|
// about 65536 is needed in order to not run into errors because of attempting to open too many files.
|
|
// This is needed for the local storage engine scenarios, where the user running the scenario is not
|
|
// necessarily the service user, for which the limits are preconfigured correctly.
|
|
//
|
|
// See: https://k6.io/docs/misc/fine-tuning-os/
|
|
func checkResourceLimits() error {
|
|
const (
|
|
minNofileLimit = 1 << 16
|
|
)
|
|
rlimit := &unix.Rlimit{}
|
|
if err := unix.Getrlimit(unix.RLIMIT_NOFILE, rlimit); err != nil {
|
|
return fmt.Errorf("getting resource limits: %v", err)
|
|
}
|
|
if rlimit.Cur < minNofileLimit {
|
|
return fmt.Errorf("nofile limit is too low: %d", rlimit.Cur)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetOrCreateEngine returns the current engine instance for the given configuration file or directory,
|
|
// creating a new one if none exists. Note that the identity of configuration files is their
|
|
// file name for the purposes of test runs.
|
|
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool, maxSizeGB int64) (*engine.StorageEngine, Limiter, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if len(configFile) == 0 && len(configDir) == 0 {
|
|
return nil, nil, errors.New("provide configFile or configDir")
|
|
}
|
|
|
|
if r.l == nil {
|
|
r.l = NewLimiter(maxSizeGB)
|
|
}
|
|
// Create and initialize engine for the given configFile if it doesn't exist already
|
|
if r.ng == nil {
|
|
r.configFile = configFile
|
|
r.configDir = configDir
|
|
appCfg := config.New(configFile, configDir, "")
|
|
ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug, r.l)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("creating engine options from config: %v", err)
|
|
}
|
|
if err := checkResourceLimits(); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
r.ng = engine.New(ngOpts...)
|
|
for i, opts := range shardOpts {
|
|
if _, err := r.ng.AddShard(ctx, opts...); err != nil {
|
|
return nil, nil, fmt.Errorf("adding shard %d: %v", i, err)
|
|
}
|
|
}
|
|
if err := r.ng.Open(ctx); err != nil {
|
|
return nil, nil, fmt.Errorf("opening engine: %v", err)
|
|
}
|
|
if err := r.ng.Init(ctx); err != nil {
|
|
return nil, nil, fmt.Errorf("initializing engine: %v", err)
|
|
}
|
|
} else if configFile != r.configFile {
|
|
return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+
|
|
"initialized: got %q, want %q", configFile, r.configFile)
|
|
} else if configDir != r.configDir {
|
|
return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+
|
|
"initialized: got %q, want %q", configDir, r.configDir)
|
|
}
|
|
|
|
return r.ng, r.l, nil
|
|
}
|
|
|
|
// Exports implements the modules.Instance interface and returns the exports
|
|
// of the JS module.
|
|
func (s *Local) Exports() modules.Exports {
|
|
return modules.Exports{Default: s}
|
|
}
|
|
|
|
func (s *Local) VU() modules.VU { return s.vu }
|
|
|
|
func (s *Local) Connect(configFile, configDir, hexKey string, debug bool, maxSizeGB int64) (*Client, error) {
|
|
ng, l, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug, maxSizeGB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
|
|
}
|
|
|
|
key, err := ParseOrCreateKey(hexKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating key: %v", err)
|
|
}
|
|
|
|
// Register metrics.
|
|
objPutSuccess, _ = stats.Registry.NewMetric("local_obj_put_success", metrics.Counter)
|
|
objPutFails, _ = stats.Registry.NewMetric("local_obj_put_fails", metrics.Counter)
|
|
objPutDuration, _ = stats.Registry.NewMetric("local_obj_put_duration", metrics.Trend, metrics.Time)
|
|
objPutData, _ = stats.Registry.NewMetric("local_obj_put_bytes", metrics.Counter, metrics.Data)
|
|
|
|
objGetSuccess, _ = stats.Registry.NewMetric("local_obj_get_success", metrics.Counter)
|
|
objGetFails, _ = stats.Registry.NewMetric("local_obj_get_fails", metrics.Counter)
|
|
objGetDuration, _ = stats.Registry.NewMetric("local_obj_get_duration", metrics.Trend, metrics.Time)
|
|
objGetData, _ = stats.Registry.NewMetric("local_obj_get_bytes", metrics.Counter, metrics.Data)
|
|
|
|
objDeleteSuccess, _ = stats.Registry.NewMetric("local_obj_delete_success", metrics.Counter)
|
|
objDeleteFails, _ = stats.Registry.NewMetric("local_obj_delete_fails", metrics.Counter)
|
|
objDeleteDuration, _ = stats.Registry.NewMetric("local_obj_delete_duration", metrics.Trend, metrics.Time)
|
|
|
|
// Create raw client backed by local storage engine.
|
|
rc := rawclient.New(ng,
|
|
rawclient.WithKey(key.PrivateKey),
|
|
rawclient.WithPutHandler(func(sz uint64, err error, dt time.Duration) {
|
|
if err != nil {
|
|
stats.Report(s.vu, objPutFails, 1)
|
|
} else {
|
|
stats.Report(s.vu, objPutSuccess, 1)
|
|
stats.ReportDataSent(s.vu, float64(sz))
|
|
stats.Report(s.vu, objPutDuration, metrics.D(dt))
|
|
stats.Report(s.vu, objPutData, float64(sz))
|
|
}
|
|
}),
|
|
rawclient.WithGetHandler(func(sz uint64, err error, dt time.Duration) {
|
|
if err != nil {
|
|
stats.Report(s.vu, objGetFails, 1)
|
|
} else {
|
|
stats.Report(s.vu, objGetSuccess, 1)
|
|
stats.Report(s.vu, objGetDuration, metrics.D(dt))
|
|
stats.ReportDataReceived(s.vu, float64(sz))
|
|
stats.Report(s.vu, objGetData, float64(sz))
|
|
}
|
|
}),
|
|
rawclient.WithDeleteHandler(func(err error, dt time.Duration) {
|
|
if err != nil {
|
|
stats.Report(s.vu, objDeleteFails, 1)
|
|
} else {
|
|
stats.Report(s.vu, objDeleteSuccess, 1)
|
|
stats.Report(s.vu, objDeleteDuration, metrics.D(dt))
|
|
}
|
|
}),
|
|
)
|
|
return &Client{vu: s.vu, rc: rc, l: l}, nil
|
|
}
|
|
|
|
type epochState struct{}
|
|
|
|
func (epochState) CurrentEpoch() uint64 { return 0 }
|
|
|
|
// storageEngineOptionsFromConfig loads a configuration file and returns the corresponding
|
|
// engine and shard options to recreate an engine usable with an existing storage instance.
|
|
// This makes sure that the local loader uses the same engine configuration as the one that
|
|
// preloaded the storage (if any), by using the same configuration file.
|
|
//
|
|
// Note that the configuration file only needs to contain the storage-specific sections.
|
|
func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]engine.Option, [][]shard.Option, error) {
|
|
log := zap.L()
|
|
if debug {
|
|
var err error
|
|
log, err = zap.NewDevelopment()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("creating development logger: %v", err)
|
|
}
|
|
}
|
|
|
|
ngOpts := []engine.Option{
|
|
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
|
|
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)),
|
|
engine.WithLogger(&logger.Logger{Logger: log}),
|
|
engine.WithMetrics(l),
|
|
}
|
|
|
|
var shOpts [][]shard.Option
|
|
|
|
err := engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
|
|
opts := []shard.Option{
|
|
shard.WithRefillMetabase(sc.RefillMetabase()),
|
|
shard.WithMode(sc.Mode()),
|
|
shard.WithLogger(&logger.Logger{Logger: log}),
|
|
}
|
|
|
|
// substorages
|
|
{
|
|
var substorages []blobstor.SubStorage
|
|
for _, scfg := range sc.BlobStor().Storages() {
|
|
switch scfg.Type() {
|
|
case blobovniczatree.Type:
|
|
cfg := blobovniczaconfig.From((*config.Config)(scfg))
|
|
ss := blobstor.SubStorage{
|
|
Storage: blobovniczatree.NewBlobovniczaTree(
|
|
blobovniczatree.WithRootPath(scfg.Path()),
|
|
blobovniczatree.WithPermissions(scfg.Perm()),
|
|
blobovniczatree.WithBlobovniczaSize(cfg.Size()),
|
|
blobovniczatree.WithBlobovniczaShallowDepth(cfg.ShallowDepth()),
|
|
blobovniczatree.WithBlobovniczaShallowWidth(cfg.ShallowWidth()),
|
|
blobovniczatree.WithOpenedCacheSize(cfg.OpenedCacheSize()),
|
|
blobovniczatree.WithLogger(&logger.Logger{Logger: log}),
|
|
),
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return uint64(len(data)) < sc.SmallSizeLimit()
|
|
},
|
|
}
|
|
substorages = append(substorages, ss)
|
|
case fstree.Type:
|
|
cfg := fstreeconfig.From((*config.Config)(scfg))
|
|
ss := blobstor.SubStorage{
|
|
Storage: fstree.New(
|
|
fstree.WithPath(scfg.Path()),
|
|
fstree.WithPerm(scfg.Perm()),
|
|
fstree.WithDepth(cfg.Depth()),
|
|
fstree.WithNoSync(cfg.NoSync()),
|
|
),
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return true
|
|
},
|
|
}
|
|
substorages = append(substorages, ss)
|
|
default:
|
|
return fmt.Errorf("invalid storage type: %s", scfg.Type())
|
|
}
|
|
}
|
|
opts = append(opts, shard.WithBlobStorOptions(
|
|
blobstor.WithCompressObjects(sc.Compress()),
|
|
blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()),
|
|
blobstor.WithStorages(substorages),
|
|
blobstor.WithLogger(&logger.Logger{Logger: log}),
|
|
))
|
|
}
|
|
|
|
// write cache
|
|
if wc := sc.WriteCache(); wc.Enabled() {
|
|
opts = append(opts,
|
|
shard.WithWriteCache(true),
|
|
shard.WithWriteCacheOptions(
|
|
writecache.Options{
|
|
Type: writecache.TypeBBolt,
|
|
BBoltOptions: []writecachebbolt.Option{
|
|
writecachebbolt.WithPath(wc.Path()),
|
|
writecachebbolt.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
|
|
writecachebbolt.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
|
|
writecachebbolt.WithMaxObjectSize(wc.MaxObjectSize()),
|
|
writecachebbolt.WithSmallObjectSize(wc.SmallObjectSize()),
|
|
writecachebbolt.WithFlushWorkersCount(wc.WorkersNumber()),
|
|
writecachebbolt.WithMaxCacheSize(wc.SizeLimit()),
|
|
writecachebbolt.WithNoSync(wc.NoSync()),
|
|
writecachebbolt.WithLogger(&logger.Logger{Logger: log}),
|
|
},
|
|
},
|
|
),
|
|
)
|
|
}
|
|
|
|
// tree
|
|
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
|
pr := sc.Pilorama()
|
|
opts = append(opts, shard.WithPiloramaOptions(
|
|
pilorama.WithPath(pr.Path()),
|
|
pilorama.WithPerm(pr.Perm()),
|
|
pilorama.WithMaxBatchSize(pr.MaxBatchSize()),
|
|
pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()),
|
|
pilorama.WithNoSync(pr.NoSync()),
|
|
))
|
|
}
|
|
|
|
// metabase
|
|
{
|
|
mb := sc.Metabase()
|
|
opts = append(opts, shard.WithMetaBaseOptions(
|
|
metabase.WithPath(mb.Path()),
|
|
metabase.WithPermissions(mb.BoltDB().Perm()),
|
|
metabase.WithMaxBatchSize(mb.BoltDB().MaxBatchSize()),
|
|
metabase.WithMaxBatchDelay(mb.BoltDB().MaxBatchDelay()),
|
|
metabase.WithBoltDBOptions(&bbolt.Options{
|
|
Timeout: 1 * time.Second,
|
|
}),
|
|
metabase.WithEpochState(epochState{}),
|
|
metabase.WithLogger(&logger.Logger{Logger: log}),
|
|
))
|
|
}
|
|
|
|
// GC
|
|
{
|
|
gc := sc.GC()
|
|
opts = append(opts,
|
|
shard.WithGCRemoverSleepInterval(gc.RemoverSleepInterval()),
|
|
shard.WithRemoverBatchSize(gc.RemoverBatchSize()),
|
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
|
pool, err := ants.NewPool(sz)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return pool
|
|
}),
|
|
)
|
|
}
|
|
|
|
shOpts = append(shOpts, opts)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("iterate shards: %w", err)
|
|
}
|
|
return ngOpts, shOpts, nil
|
|
}
|
|
|
|
// ParseOrCreateKey parses the provided key as a hex string or creates a fresh one if empty.
|
|
func ParseOrCreateKey(hexKeyStr string) (*keys.PrivateKey, error) {
|
|
if hexKeyStr != "" {
|
|
return keys.NewPrivateKeyFromHex(hexKeyStr)
|
|
}
|
|
return keys.NewPrivateKey()
|
|
}
|