package local import ( "context" "errors" "fmt" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" metabase "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" writecache "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" "go.uber.org/zap" "golang.org/x/sys/unix" ) // RootModule is the global module object type. It is instantiated once per test // run and will be used to create k6/x/frostfs/local module instances for each VU. type RootModule struct { mu sync.Mutex // configFile is the name of the configuration file used during one test. configFile string // configDir is the name of the configuration directory used during one test. configDir string // ng is the engine instance used during one test, corresponding to the configFile. Each VU // gets the same engine instance. ng *engine.StorageEngine l Limiter } // Local represents an instance of the module for every VU. type Local struct { vu modules.VU ResolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error) } // Ensure the interfaces are implemented correctly. var ( _ modules.Module = &RootModule{} _ modules.Instance = &Local{} objPutTotal, objPutFails, objPutDuration *metrics.Metric objGetTotal, objGetFails, objGetDuration *metrics.Metric objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric ) func init() { modules.Register("k6/x/frostfs/local", &RootModule{}) } // NewModuleInstance implements the modules.Module interface and returns // a new instance for each VU. func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return NewLocalModuleInstance(vu, r.GetOrCreateEngine) } func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)) *Local { return &Local{ vu: vu, ResolveEngine: resolveEngine, } } // checkResourceLimits checks the current limit on NOFILE. // // The usual default is around 1024 and this is too low for production clusters where a value of // about 65536 is needed in order to not run into errors because of attempting to open too many files. // This is needed for the local storage engine scenarios, where the user running the scenario is not // necessarily the service user, for which the limits are preconfigured correctly. // // See: https://k6.io/docs/misc/fine-tuning-os/ func checkResourceLimits() error { const ( minNofileLimit = 1 << 16 ) rlimit := &unix.Rlimit{} if err := unix.Getrlimit(unix.RLIMIT_NOFILE, rlimit); err != nil { return fmt.Errorf("getting resource limits: %v", err) } if rlimit.Cur < minNofileLimit { return fmt.Errorf("nofile limit is too low: %d", rlimit.Cur) } return nil } // GetOrCreateEngine returns the current engine instance for the given configuration file or directory, // creating a new one if none exists. Note that the identity of configuration files is their // file name for the purposes of test runs. func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool, maxSizeGB int64) (*engine.StorageEngine, Limiter, error) { r.mu.Lock() defer r.mu.Unlock() if len(configFile) == 0 && len(configDir) == 0 { return nil, nil, errors.New("provide configFile or configDir") } if r.l == nil { r.l = NewLimiter(maxSizeGB) } // Create and initialize engine for the given configFile if it doesn't exist already if r.ng == nil { r.configFile = configFile r.configDir = configDir appCfg := config.New(configFile, configDir, "") ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug, r.l) if err != nil { return nil, nil, fmt.Errorf("creating engine options from config: %v", err) } if err := checkResourceLimits(); err != nil { return nil, nil, err } r.ng = engine.New(ngOpts...) for i, opts := range shardOpts { if _, err := r.ng.AddShard(ctx, opts...); err != nil { return nil, nil, fmt.Errorf("adding shard %d: %v", i, err) } } if err := r.ng.Open(ctx); err != nil { return nil, nil, fmt.Errorf("opening engine: %v", err) } if err := r.ng.Init(ctx); err != nil { return nil, nil, fmt.Errorf("initializing engine: %v", err) } } else if configFile != r.configFile { return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+ "initialized: got %q, want %q", configFile, r.configFile) } else if configDir != r.configDir { return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+ "initialized: got %q, want %q", configDir, r.configDir) } return r.ng, r.l, nil } // Exports implements the modules.Instance interface and returns the exports // of the JS module. func (s *Local) Exports() modules.Exports { return modules.Exports{Default: s} } func (s *Local) VU() modules.VU { return s.vu } func (s *Local) Connect(configFile, configDir, hexKey string, debug bool, maxSizeGB int64) (*Client, error) { ng, l, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug, maxSizeGB) if err != nil { return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err) } key, err := ParseOrCreateKey(hexKey) if err != nil { return nil, fmt.Errorf("creating key: %v", err) } // Register metrics. registry := metrics.NewRegistry() objPutTotal, _ = registry.NewMetric("local_obj_put_total", metrics.Counter) objPutFails, _ = registry.NewMetric("local_obj_put_fails", metrics.Counter) objPutDuration, _ = registry.NewMetric("local_obj_put_duration", metrics.Trend, metrics.Time) objGetTotal, _ = registry.NewMetric("local_obj_get_total", metrics.Counter) objGetFails, _ = registry.NewMetric("local_obj_get_fails", metrics.Counter) objGetDuration, _ = registry.NewMetric("local_obj_get_duration", metrics.Trend, metrics.Time) objDeleteTotal, _ = registry.NewMetric("local_obj_delete_total", metrics.Counter) objDeleteFails, _ = registry.NewMetric("local_obj_delete_fails", metrics.Counter) objDeleteDuration, _ = registry.NewMetric("local_obj_delete_duration", metrics.Trend, metrics.Time) // Create raw client backed by local storage engine. rc := rawclient.New(ng, rawclient.WithKey(key.PrivateKey), rawclient.WithPutHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objPutFails, 1) } else { stats.Report(s.vu, objPutTotal, 1) stats.ReportDataSent(s.vu, float64(sz)) stats.Report(s.vu, objPutDuration, metrics.D(dt)) } }), rawclient.WithGetHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objGetFails, 1) } else { stats.Report(s.vu, objGetTotal, 1) stats.Report(s.vu, objGetDuration, metrics.D(dt)) stats.ReportDataReceived(s.vu, float64(sz)) } }), rawclient.WithDeleteHandler(func(err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objDeleteFails, 1) } else { stats.Report(s.vu, objDeleteTotal, 1) stats.Report(s.vu, objDeleteDuration, metrics.D(dt)) } }), ) return &Client{vu: s.vu, rc: rc, l: l}, nil } type epochState struct{} func (epochState) CurrentEpoch() uint64 { return 0 } // storageEngineOptionsFromConfig loads a configuration file and returns the corresponding // engine and shard options to recreate an engine usable with an existing storage instance. // This makes sure that the local loader uses the same engine configuration as the one that // preloaded the storage (if any), by using the same configuration file. // // Note that the configuration file only needs to contain the storage-specific sections. func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]engine.Option, [][]shard.Option, error) { log := zap.L() if debug { var err error log, err = zap.NewDevelopment() if err != nil { return nil, nil, fmt.Errorf("creating development logger: %v", err) } } ngOpts := []engine.Option{ engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)), engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)), engine.WithLogger(&logger.Logger{Logger: log}), engine.WithMetrics(l), } var shOpts [][]shard.Option engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { opts := []shard.Option{ shard.WithRefillMetabase(sc.RefillMetabase()), shard.WithMode(sc.Mode()), shard.WithLogger(&logger.Logger{Logger: log}), } // substorages { var substorages []blobstor.SubStorage for _, scfg := range sc.BlobStor().Storages() { switch scfg.Type() { case blobovniczatree.Type: cfg := blobovniczaconfig.From((*config.Config)(scfg)) ss := blobstor.SubStorage{ Storage: blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(scfg.Path()), blobovniczatree.WithPermissions(scfg.Perm()), blobovniczatree.WithBlobovniczaSize(cfg.Size()), blobovniczatree.WithBlobovniczaShallowDepth(cfg.ShallowDepth()), blobovniczatree.WithBlobovniczaShallowWidth(cfg.ShallowWidth()), blobovniczatree.WithOpenedCacheSize(cfg.OpenedCacheSize()), blobovniczatree.WithLogger(&logger.Logger{Logger: log}), ), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < sc.SmallSizeLimit() }, } substorages = append(substorages, ss) case fstree.Type: cfg := fstreeconfig.From((*config.Config)(scfg)) ss := blobstor.SubStorage{ Storage: fstree.New( fstree.WithPath(scfg.Path()), fstree.WithPerm(scfg.Perm()), fstree.WithDepth(cfg.Depth()), fstree.WithNoSync(cfg.NoSync()), ), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, } substorages = append(substorages, ss) default: return fmt.Errorf("invalid storage type: %s", scfg.Type()) } } opts = append(opts, shard.WithBlobStorOptions( blobstor.WithCompressObjects(sc.Compress()), blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()), blobstor.WithStorages(substorages), blobstor.WithLogger(&logger.Logger{Logger: log}), )) } // write cache if wc := sc.WriteCache(); wc.Enabled() { opts = append(opts, shard.WithWriteCache(true), shard.WithWriteCacheOptions( writecache.Options{ Type: writecache.TypeBBolt, BBoltOptions: []writecachebbolt.Option{ writecachebbolt.WithPath(wc.Path()), writecachebbolt.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()), writecachebbolt.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()), writecachebbolt.WithMaxObjectSize(wc.MaxObjectSize()), writecachebbolt.WithSmallObjectSize(wc.SmallObjectSize()), writecachebbolt.WithFlushWorkersCount(wc.WorkersNumber()), writecachebbolt.WithMaxCacheSize(wc.SizeLimit()), writecachebbolt.WithNoSync(wc.NoSync()), writecachebbolt.WithLogger(&logger.Logger{Logger: log}), }, }, ), ) } // tree if config.BoolSafe(c.Sub("tree"), "enabled") { pr := sc.Pilorama() opts = append(opts, shard.WithPiloramaOptions( pilorama.WithPath(pr.Path()), pilorama.WithPerm(pr.Perm()), pilorama.WithMaxBatchSize(pr.MaxBatchSize()), pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()), pilorama.WithNoSync(pr.NoSync()), )) } // metabase { mb := sc.Metabase() opts = append(opts, shard.WithMetaBaseOptions( metabase.WithPath(mb.Path()), metabase.WithPermissions(mb.BoltDB().Perm()), metabase.WithMaxBatchSize(mb.BoltDB().MaxBatchSize()), metabase.WithMaxBatchDelay(mb.BoltDB().MaxBatchDelay()), metabase.WithBoltDBOptions(&bbolt.Options{ Timeout: 1 * time.Second, }), metabase.WithEpochState(epochState{}), metabase.WithLogger(&logger.Logger{Logger: log}), )) } // GC { gc := sc.GC() opts = append(opts, shard.WithGCRemoverSleepInterval(gc.RemoverSleepInterval()), shard.WithRemoverBatchSize(gc.RemoverBatchSize()), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) if err != nil { panic(err) } return pool }), ) } shOpts = append(shOpts, opts) return nil }) return ngOpts, shOpts, nil } // ParseOrCreateKey parses the provided key as a hex string or creates a fresh one if empty. func ParseOrCreateKey(hexKeyStr string) (*keys.PrivateKey, error) { if hexKeyStr != "" { return keys.NewPrivateKeyFromHex(hexKeyStr) } return keys.NewPrivateKey() }