package s3local import ( "context" "flag" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" "go.uber.org/zap" ) // RootModule is the global module object type. It is instantiated once per test // run and will be used to create k6/x/frostfs/s3local module instances for each VU. type RootModule struct { m *local.RootModule } // Local represents an instance of the module for every VU. type Local struct { l *local.Local } // Ensure the interfaces are implemented correctly. var ( _ modules.Module = &RootModule{} _ modules.Instance = &Local{} internalObjPutSuccess, internalObjPutFails, internalObjPutDuration, internalObjPutData *metrics.Metric internalObjGetSuccess, internalObjGetFails, internalObjGetDuration, internalObjGetData *metrics.Metric objPutSuccess, objPutFails, objPutDuration, objPutData *metrics.Metric objGetSuccess, objGetFails, objGetDuration, objGetData *metrics.Metric ) func init() { modules.Register("k6/x/frostfs/s3local", &RootModule{ m: &local.RootModule{}, }) } // NewModuleInstance implements the modules.Module interface and returns // a new instance for each VU. func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return &Local{local.NewLocalModuleInstance(vu, r.m.GetOrCreateEngine)} } // Exports implements the modules.Instance interface and returns the exports // of the JS module. func (s *Local) Exports() modules.Exports { return modules.Exports{Default: s} } func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string, maxSizeGB int64) (*Client, error) { // Parse configuration flags. fs := flag.NewFlagSet("s3local", flag.ContinueOnError) hexKey := fs.String("hex_key", "", "Private key to use as a hexadecimal string. A random one is created if none is provided") nodePosition := fs.Int("node_position", 0, "Position of this node in the node array if loading multiple nodes independently") nodeCount := fs.Int("node_count", 1, "Number of nodes in the node array if loading multiple nodes independently") debugLogger := fs.Bool("debug_logger", false, "Whether to use the development logger instead of the default one for debugging purposes") { args := make([]string, 0, len(params)) for k, v := range params { args = append(args, fmt.Sprintf("-%s=%s", k, v)) } if err := fs.Parse(args); err != nil { return nil, fmt.Errorf("parsing parameters: %v", err) } } // Validate and read configuration flags. key, err := local.ParseOrCreateKey(*hexKey) if err != nil { return nil, fmt.Errorf("parsing hex_key: %v", err) } if *nodeCount <= 0 { return nil, fmt.Errorf("node_count must be positive") } if *nodePosition < 0 || *nodePosition >= *nodeCount { return nil, fmt.Errorf("node_position must be in the range [0, node_count-1]") } // Register metrics. internalObjPutSuccess, _ = stats.Registry.NewMetric("s3local_internal_obj_put_success", metrics.Counter) internalObjPutFails, _ = stats.Registry.NewMetric("s3local_internal_obj_put_fails", metrics.Counter) internalObjPutDuration, _ = stats.Registry.NewMetric("s3local_internal_obj_put_duration", metrics.Trend, metrics.Time) internalObjPutData, _ = stats.Registry.NewMetric("s3local_internal_obj_put_bytes", metrics.Counter, metrics.Data) internalObjGetSuccess, _ = stats.Registry.NewMetric("s3local_internal_obj_get_success", metrics.Counter) internalObjGetFails, _ = stats.Registry.NewMetric("s3local_internal_obj_get_fails", metrics.Counter) internalObjGetDuration, _ = stats.Registry.NewMetric("s3local_internal_obj_get_duration", metrics.Trend, metrics.Time) internalObjGetData, _ = stats.Registry.NewMetric("s3local_internal_obj_get_bytes", metrics.Counter, metrics.Data) objPutSuccess, _ = stats.Registry.NewMetric("s3local_obj_put_success", metrics.Counter) objPutFails, _ = stats.Registry.NewMetric("s3local_obj_put_fails", metrics.Counter) objPutDuration, _ = stats.Registry.NewMetric("s3local_obj_put_duration", metrics.Trend, metrics.Time) objPutData, _ = stats.Registry.NewMetric("s3local_obj_put_bytes", metrics.Counter, metrics.Data) objGetSuccess, _ = stats.Registry.NewMetric("s3local_obj_get_success", metrics.Counter) objGetFails, _ = stats.Registry.NewMetric("s3local_obj_get_fails", metrics.Counter) objGetDuration, _ = stats.Registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time) objGetData, _ = stats.Registry.NewMetric("s3local_obj_get_bytes", metrics.Counter, metrics.Data) // Create S3 layer backed by local storage engine and tree service. ng, limiter, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger, maxSizeGB) if err != nil { return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err) } treeSvc := tree.NewTree(treeServiceEngineWrapper{ ng: ng, pos: *nodePosition, size: *nodeCount, }, zap.L()) rc := rawclient.New(ng, rawclient.WithKey(key.PrivateKey), rawclient.WithPutHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.l.VU(), internalObjPutFails, 1) } else { stats.Report(s.l.VU(), internalObjPutSuccess, 1) stats.Report(s.l.VU(), internalObjPutDuration, metrics.D(dt)) stats.Report(s.l.VU(), internalObjPutData, float64(sz)) } }), rawclient.WithGetHandler(func(sz uint64, err error, dt time.Duration) { if err != nil { stats.Report(s.l.VU(), internalObjGetFails, 1) } else { stats.Report(s.l.VU(), internalObjGetSuccess, 1) stats.Report(s.l.VU(), internalObjGetDuration, metrics.D(dt)) stats.Report(s.l.VU(), internalObjGetData, float64(sz)) } }), ) resolver, err := newFixedBucketResolver(bucketMapping) if err != nil { return nil, fmt.Errorf("creating bucket resolver: %v", err) } cfg := &layer.Config{ Cache: layer.NewCache(layer.DefaultCachesConfigs(zap.L())), AnonKey: layer.AnonymousKey{Key: key}, Resolver: resolver, TreeService: treeSvc, } l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg) err = l.Initialize(s.l.VU().Context(), nopEventListener{}) if err != nil { return nil, fmt.Errorf("initialize: %w", err) } return &Client{ vu: s.l.VU(), l: l, ownerID: rc.OwnerID(), resolver: resolver, limiter: limiter, }, nil } type nopEventListener struct{} func (nopEventListener) Subscribe(context.Context, string, layer.MsgHandler) error { return nil } func (nopEventListener) Listen(context.Context) {}