package local import ( "context" "errors" "fmt" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" metabase "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" "go.uber.org/zap" ) // RootModule is the global module object type. It is instantiated once per test // run and will be used to create k6/x/frostfs/local module instances for each VU. type RootModule struct { mu sync.Mutex // configFile is the name of the configuration file used during one test. configFile string // ng is the engine instance used during one test, corresponding to the configFile. Each VU // gets the same engine instance. ng *engine.StorageEngine } // Local represents an instance of the module for every VU. type Local struct { vu modules.VU ResolveEngine func(context.Context, string) (*engine.StorageEngine, error) } // Ensure the interfaces are implemented correctly. var ( _ modules.Module = &RootModule{} _ modules.Instance = &Local{} objPutTotal, objPutFails, objPutDuration *metrics.Metric objGetTotal, objGetFails, objGetDuration *metrics.Metric objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric ) func init() { modules.Register("k6/x/frostfs/local", &RootModule{}) } // NewModuleInstance implements the modules.Module interface and returns // a new instance for each VU. func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return NewLocalModuleInstance(vu, r.GetOrCreateEngine) } func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string) (*engine.StorageEngine, error)) *Local { return &Local{ vu: vu, ResolveEngine: resolveEngine, } } // GetOrCreateEngine returns the current engine instance for the given configuration file, // creating a new one if none exists. Note that the identity of configuration files is their // file name for the purposes of test runs. func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string) (*engine.StorageEngine, error) { r.mu.Lock() defer r.mu.Unlock() if len(configFile) == 0 { return nil, errors.New("configFile cannot be empty") } // Create and initialize engine for the given configFile if it doesn't exist already if r.ng == nil { r.configFile = configFile appCfg := config.New(config.Prm{}, config.WithConfigFile(configFile)) ngOpts, shardOpts := storageEngineOptionsFromConfig(appCfg) r.ng = engine.New(ngOpts...) for i, opts := range shardOpts { if _, err := r.ng.AddShard(opts...); err != nil { return nil, fmt.Errorf("adding shard %d: %v", i, err) } } if err := r.ng.Open(); err != nil { return nil, fmt.Errorf("opening engine: %v", err) } if err := r.ng.Init(); err != nil { return nil, fmt.Errorf("initializing engine: %v", err) } } else if configFile != r.configFile { return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was initialized: got %q, want %q", configFile, r.configFile) } return r.ng, nil } // Exports implements the modules.Instance interface and returns the exports // of the JS module. func (s *Local) Exports() modules.Exports { return modules.Exports{Default: s} } func (s *Local) VU() modules.VU { return s.vu } func (s *Local) Connect(configFile, hexKey string) (*Client, error) { ng, err := s.ResolveEngine(s.VU().Context(), configFile) if err != nil { return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err) } key, err := ParseOrCreateKey(hexKey) if err != nil { return nil, fmt.Errorf("creating key: %v", err) } // Register metrics. registry := metrics.NewRegistry() objPutTotal, _ = registry.NewMetric("local_obj_put_total", metrics.Counter) objPutFails, _ = registry.NewMetric("local_obj_put_fails", metrics.Counter) objPutDuration, _ = registry.NewMetric("local_obj_put_duration", metrics.Trend, metrics.Time) objGetTotal, _ = registry.NewMetric("local_obj_get_total", metrics.Counter) objGetFails, _ = registry.NewMetric("local_obj_get_fails", metrics.Counter) objGetDuration, _ = registry.NewMetric("local_obj_get_duration", metrics.Trend, metrics.Time) objDeleteTotal, _ = registry.NewMetric("local_obj_delete_total", metrics.Counter) objDeleteFails, _ = registry.NewMetric("local_obj_delete_fails", metrics.Counter) objDeleteDuration, _ = registry.NewMetric("local_obj_delete_duration", metrics.Trend, metrics.Time) // Create raw client backed by local storage engine. rc := rawclient.New(ng, rawclient.WithKey(key.PrivateKey), rawclient.WithPutHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objPutFails, 1) } else { stats.Report(s.vu, objPutTotal, 1) stats.ReportDataSent(s.vu, float64(sz)) stats.Report(s.vu, objPutDuration, metrics.D(dt)) } }), rawclient.WithGetHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objGetFails, 1) } else { stats.Report(s.vu, objGetTotal, 1) stats.Report(s.vu, objGetDuration, metrics.D(dt)) stats.ReportDataReceived(s.vu, float64(sz)) } }), rawclient.WithDeleteHandler(func(err error, dt time.Duration) { if err != nil { stats.Report(s.vu, objDeleteFails, 1) } else { stats.Report(s.vu, objDeleteTotal, 1) stats.Report(s.vu, objDeleteDuration, metrics.D(dt)) } }), ) return &Client{rc}, nil } type epochState struct{} func (epochState) CurrentEpoch() uint64 { return 0 } // storageEngineOptionsFromConfig loads a configuration file and returns the corresponding // engine and shard options to recreate an engine usable with an existing storage instance. // This makes sure that the local loader uses the same engine configuration as the one that // preloaded the storage (if any), by using the same configuration file. // // Note that the configuration file only needs to contain the storage-specific sections. func storageEngineOptionsFromConfig(c *config.Config) ([]engine.Option, [][]shard.Option) { log := zap.L() ngOpts := []engine.Option{ engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)), engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)), engine.WithLogger(&logger.Logger{Logger: log}), } var shOpts [][]shard.Option engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { opts := []shard.Option{ shard.WithRefillMetabase(sc.RefillMetabase()), shard.WithMode(sc.Mode()), shard.WithLogger(&logger.Logger{Logger: log}), } // substorages { var substorages []blobstor.SubStorage for _, scfg := range sc.BlobStor().Storages() { switch scfg.Type() { case blobovniczatree.Type: cfg := blobovniczaconfig.From((*config.Config)(scfg)) ss := blobstor.SubStorage{ Storage: blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(scfg.Path()), blobovniczatree.WithPermissions(scfg.Perm()), blobovniczatree.WithBlobovniczaSize(cfg.Size()), blobovniczatree.WithBlobovniczaShallowDepth(cfg.ShallowDepth()), blobovniczatree.WithBlobovniczaShallowWidth(cfg.ShallowWidth()), blobovniczatree.WithOpenedCacheSize(cfg.OpenedCacheSize()), blobovniczatree.WithLogger(&logger.Logger{Logger: log}), ), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < sc.SmallSizeLimit() }, } substorages = append(substorages, ss) case fstree.Type: cfg := fstreeconfig.From((*config.Config)(scfg)) ss := blobstor.SubStorage{ Storage: fstree.New( fstree.WithPath(scfg.Path()), fstree.WithPerm(scfg.Perm()), fstree.WithDepth(cfg.Depth()), fstree.WithNoSync(cfg.NoSync()), ), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, } substorages = append(substorages, ss) default: return fmt.Errorf("invalid storage type: %s", scfg.Type()) } } opts = append(opts, shard.WithBlobStorOptions( blobstor.WithCompressObjects(sc.Compress()), blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()), blobstor.WithStorages(substorages), blobstor.WithLogger(&logger.Logger{Logger: log}), )) } // write cache if wc := sc.WriteCache(); wc.Enabled() { opts = append(opts, shard.WithWriteCacheOptions( writecache.WithPath(wc.Path()), writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()), writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()), writecache.WithMaxObjectSize(wc.MaxObjectSize()), writecache.WithSmallObjectSize(wc.SmallObjectSize()), writecache.WithFlushWorkersCount(wc.WorkersNumber()), writecache.WithMaxCacheSize(wc.SizeLimit()), writecache.WithNoSync(wc.NoSync()), writecache.WithLogger(&logger.Logger{Logger: log}), )) } // tree if config.BoolSafe(c.Sub("tree"), "enabled") { pr := sc.Pilorama() opts = append(opts, shard.WithPiloramaOptions( pilorama.WithPath(pr.Path()), pilorama.WithPerm(pr.Perm()), pilorama.WithMaxBatchSize(pr.MaxBatchSize()), pilorama.WithMaxBatchDelay(pr.MaxBatchDelay()), pilorama.WithNoSync(pr.NoSync()), )) } // metabase { mb := sc.Metabase() opts = append(opts, shard.WithMetaBaseOptions( metabase.WithPath(mb.Path()), metabase.WithPermissions(mb.BoltDB().Perm()), metabase.WithMaxBatchSize(mb.BoltDB().MaxBatchSize()), metabase.WithMaxBatchDelay(mb.BoltDB().MaxBatchDelay()), metabase.WithBoltDBOptions(&bbolt.Options{ Timeout: 1 * time.Second, }), metabase.WithEpochState(epochState{}), metabase.WithLogger(&logger.Logger{Logger: log}), )) } // GC { gc := sc.GC() opts = append(opts, shard.WithGCRemoverSleepInterval(gc.RemoverSleepInterval()), shard.WithRemoverBatchSize(gc.RemoverBatchSize()), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) if err != nil { panic(err) } return pool }), ) } shOpts = append(shOpts, opts) return nil }) return ngOpts, shOpts } // ParseOrCreateKey parses the provided key as a hex string or creates a fresh one if empty. func ParseOrCreateKey(hexKeyStr string) (*keys.PrivateKey, error) { if hexKeyStr != "" { return keys.NewPrivateKeyFromHex(hexKeyStr) } return keys.NewPrivateKey() }