From bc47d663163054fd207595e003ee64c88b9570af Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 14 Dec 2023 16:22:07 +0300 Subject: [PATCH] [#106] xk6: Allow to set max total size in local scenarios Signed-off-by: Dmitrii Stepanov --- internal/local/client.go | 9 ++++ internal/local/limiter.go | 89 ++++++++++++++++++++++++++++++++++++++ internal/local/local.go | 39 +++++++++-------- internal/s3local/client.go | 10 +++++ internal/s3local/local.go | 5 ++- scenarios/local.js | 7 ++- scenarios/run_scenarios.md | 2 + scenarios/s3local.js | 7 ++- 8 files changed, 147 insertions(+), 21 deletions(-) create mode 100644 internal/local/limiter.go diff --git a/internal/local/client.go b/internal/local/client.go index 491bce5..6116bca 100644 --- a/internal/local/client.go +++ b/internal/local/client.go @@ -13,6 +13,7 @@ import ( type Client struct { vu modules.VU rc *rawclient.RawClient + l Limiter } type ( @@ -25,6 +26,7 @@ type ( Success bool ObjectID string Error string + Abort bool } GetResponse SuccessOrErrorResponse @@ -32,6 +34,13 @@ type ( ) func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { + if c.l.IsFull() { + return PutResponse{ + Success: false, + Error: "engine size limit reached", + Abort: true, + } + } id, err := c.rc.Put(c.vu.Context(), mustParseContainerID(containerID), nil, headers, payload.Bytes()) if err != nil { return PutResponse{Error: err.Error()} diff --git a/internal/local/limiter.go b/internal/local/limiter.go new file mode 100644 index 0000000..a59f85d --- /dev/null +++ b/internal/local/limiter.go @@ -0,0 +1,89 @@ +package local + +import ( + "sync/atomic" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" +) + +var ( + _ Limiter = &noopLimiter{} + _ Limiter = &sizeLimiter{} +) + +type Limiter interface { + engine.MetricRegister + IsFull() bool +} + +func NewLimiter(maxSizeGB int64) Limiter { + if maxSizeGB < 0 { + panic("max size is negative") + } + if maxSizeGB == 0 { + return &noopLimiter{} + } + return &sizeLimiter{ + maxSize: maxSizeGB * 1024 * 1024 * 1024, + currentSize: &atomic.Int64{}, + } +} + +type sizeLimiter struct { + maxSize int64 + currentSize *atomic.Int64 +} + +func (*sizeLimiter) AddMethodDuration(method string, d time.Duration) {} +func (*sizeLimiter) AddToContainerSize(cnrID string, size int64) {} +func (*sizeLimiter) AddToObjectCounter(shardID string, objectType string, delta int) {} +func (*sizeLimiter) ClearErrorCounter(shardID string) {} +func (*sizeLimiter) DeleteShardMetrics(shardID string) {} +func (*sizeLimiter) GC() metrics.GCMetrics { return &noopGCMetrics{} } +func (*sizeLimiter) IncErrorCounter(shardID string) {} +func (*sizeLimiter) SetMode(shardID string, mode mode.Mode) {} +func (*sizeLimiter) SetObjectCounter(shardID string, objectType string, v uint64) {} +func (*sizeLimiter) WriteCache() metrics.WriteCacheMetrics { return &noopWriteCacheMetrics{} } + +func (sl *sizeLimiter) AddToPayloadCounter(shardID string, size int64) { + sl.currentSize.Add(size) +} + +func (sl *sizeLimiter) IsFull() bool { + cur := sl.currentSize.Load() + return cur > sl.maxSize +} + +type noopLimiter struct{} + +func (*noopLimiter) AddMethodDuration(method string, d time.Duration) {} +func (*noopLimiter) AddToContainerSize(cnrID string, size int64) {} +func (*noopLimiter) AddToObjectCounter(shardID string, objectType string, delta int) {} +func (*noopLimiter) AddToPayloadCounter(shardID string, size int64) {} +func (*noopLimiter) ClearErrorCounter(shardID string) {} +func (*noopLimiter) DeleteShardMetrics(shardID string) {} +func (*noopLimiter) GC() metrics.GCMetrics { return &noopGCMetrics{} } +func (*noopLimiter) IncErrorCounter(shardID string) {} +func (*noopLimiter) SetMode(shardID string, mode mode.Mode) {} +func (*noopLimiter) SetObjectCounter(shardID string, objectType string, v uint64) {} +func (*noopLimiter) WriteCache() metrics.WriteCacheMetrics { return &noopWriteCacheMetrics{} } +func (*noopLimiter) IsFull() bool { return false } + +type noopGCMetrics struct{} + +func (*noopGCMetrics) AddDeletedCount(shardID string, deleted uint64, failed uint64) {} +func (*noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool, string) {} +func (*noopGCMetrics) AddInhumedObjectCount(shardID string, count uint64, objectType string) {} +func (*noopGCMetrics) AddRunDuration(shardID string, d time.Duration, success bool) {} + +type noopWriteCacheMetrics struct{} + +func (*noopWriteCacheMetrics) AddMethodDuration(string, string, bool, time.Duration, string) {} +func (*noopWriteCacheMetrics) Close(shardID string) {} +func (*noopWriteCacheMetrics) IncOperationCounter(string, string, metrics.NullBool, string) {} +func (*noopWriteCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {} +func (*noopWriteCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) {} +func (*noopWriteCacheMetrics) SetMode(shardID string, mode string) {} diff --git a/internal/local/local.go b/internal/local/local.go index 08a5134..6e207d2 100644 --- a/internal/local/local.go +++ b/internal/local/local.go @@ -46,12 +46,13 @@ type RootModule struct { // ng is the engine instance used during one test, corresponding to the configFile. Each VU // gets the same engine instance. ng *engine.StorageEngine + l Limiter } // Local represents an instance of the module for every VU. type Local struct { vu modules.VU - ResolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error) + ResolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error) } // Ensure the interfaces are implemented correctly. @@ -74,7 +75,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return NewLocalModuleInstance(vu, r.GetOrCreateEngine) } -func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)) *Local { +func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)) *Local { return &Local{ vu: vu, ResolveEngine: resolveEngine, @@ -106,47 +107,50 @@ func checkResourceLimits() error { // GetOrCreateEngine returns the current engine instance for the given configuration file or directory, // creating a new one if none exists. Note that the identity of configuration files is their // file name for the purposes of test runs. -func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool) (*engine.StorageEngine, error) { +func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool, maxSizeGB int64) (*engine.StorageEngine, Limiter, error) { r.mu.Lock() defer r.mu.Unlock() if len(configFile) == 0 && len(configDir) == 0 { - return nil, errors.New("provide configFile or configDir") + return nil, nil, errors.New("provide configFile or configDir") } + if r.l == nil { + r.l = NewLimiter(maxSizeGB) + } // Create and initialize engine for the given configFile if it doesn't exist already if r.ng == nil { r.configFile = configFile r.configDir = configDir appCfg := config.New(configFile, configDir, "") - ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug) + ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug, r.l) if err != nil { - return nil, fmt.Errorf("creating engine options from config: %v", err) + return nil, nil, fmt.Errorf("creating engine options from config: %v", err) } if err := checkResourceLimits(); err != nil { - return nil, err + return nil, nil, err } r.ng = engine.New(ngOpts...) for i, opts := range shardOpts { if _, err := r.ng.AddShard(ctx, opts...); err != nil { - return nil, fmt.Errorf("adding shard %d: %v", i, err) + return nil, nil, fmt.Errorf("adding shard %d: %v", i, err) } } if err := r.ng.Open(ctx); err != nil { - return nil, fmt.Errorf("opening engine: %v", err) + return nil, nil, fmt.Errorf("opening engine: %v", err) } if err := r.ng.Init(ctx); err != nil { - return nil, fmt.Errorf("initializing engine: %v", err) + return nil, nil, fmt.Errorf("initializing engine: %v", err) } } else if configFile != r.configFile { - return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+ + return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+ "initialized: got %q, want %q", configFile, r.configFile) } else if configDir != r.configDir { - return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+ + return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+ "initialized: got %q, want %q", configDir, r.configDir) } - return r.ng, nil + return r.ng, r.l, nil } // Exports implements the modules.Instance interface and returns the exports @@ -157,8 +161,8 @@ func (s *Local) Exports() modules.Exports { func (s *Local) VU() modules.VU { return s.vu } -func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Client, error) { - ng, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug) +func (s *Local) Connect(configFile, configDir, hexKey string, debug bool, maxSizeGB int64) (*Client, error) { + ng, l, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug, maxSizeGB) if err != nil { return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err) } @@ -212,7 +216,7 @@ func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Clie } }), ) - return &Client{vu: s.vu, rc: rc}, nil + return &Client{vu: s.vu, rc: rc, l: l}, nil } type epochState struct{} @@ -225,7 +229,7 @@ func (epochState) CurrentEpoch() uint64 { return 0 } // preloaded the storage (if any), by using the same configuration file. // // Note that the configuration file only needs to contain the storage-specific sections. -func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Option, [][]shard.Option, error) { +func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]engine.Option, [][]shard.Option, error) { log := zap.L() if debug { var err error @@ -239,6 +243,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)), engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)), engine.WithLogger(&logger.Logger{Logger: log}), + engine.WithMetrics(l), } var shOpts [][]shard.Option diff --git a/internal/s3local/client.go b/internal/s3local/client.go index c637d2c..93c65d5 100644 --- a/internal/s3local/client.go +++ b/internal/s3local/client.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/dop251/goja" "go.k6.io/k6/js/modules" @@ -18,11 +19,13 @@ type Client struct { l layer.Client ownerID *user.ID resolver layer.BucketResolver + limiter local.Limiter } type ( SuccessOrErrorResponse struct { Success bool + Abort bool Error string } @@ -33,6 +36,13 @@ type ( ) func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { + if c.limiter.IsFull() { + return PutResponse{ + Success: false, + Abort: true, + Error: "engine size limit reached", + } + } cid, err := c.resolver.Resolve(c.vu.Context(), bucket) if err != nil { stats.Report(c.vu, objPutFails, 1) diff --git a/internal/s3local/local.go b/internal/s3local/local.go index 531ac15..e760fe7 100644 --- a/internal/s3local/local.go +++ b/internal/s3local/local.go @@ -56,7 +56,7 @@ func (s *Local) Exports() modules.Exports { return modules.Exports{Default: s} } -func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string) (*Client, error) { +func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string, maxSizeGB int64) (*Client, error) { // Parse configuration flags. fs := flag.NewFlagSet("s3local", flag.ContinueOnError) @@ -107,7 +107,7 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time) // Create S3 layer backed by local storage engine and tree service. - ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger) + ng, limiter, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger, maxSizeGB) if err != nil { return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err) } @@ -158,6 +158,7 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s l: l, ownerID: rc.OwnerID(), resolver: resolver, + limiter: limiter, }, nil } diff --git a/scenarios/local.js b/scenarios/local.js index d706ee3..648bead 100644 --- a/scenarios/local.js +++ b/scenarios/local.js @@ -6,6 +6,7 @@ import { SharedArray } from 'k6/data'; import { textSummary } from './libs/k6-summary-0.0.2.js'; import { parseEnv } from './libs/env-parser.js'; import { uuidv4 } from './libs/k6-utils-1.4.0.js'; +import exec from 'k6/execution'; parseEnv(); @@ -23,7 +24,8 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const config_file = __ENV.CONFIG_FILE; const config_dir = __ENV.CONFIG_DIR; const debug_logger = (__ENV.DEBUG_LOGGER || 'false') == 'true'; -const local_client = local.connect(config_file, config_dir, '', debug_logger); +const max_total_size_gb = __ENV.MAX_TOTAL_SIZE_GB ? parseInt(__ENV.MAX_TOTAL_SIZE_GB) : 0; +const local_client = local.connect(config_file, config_dir, '', debug_logger, max_total_size_gb); const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir}); const registry_enabled = !!__ENV.REGISTRY_FILE; @@ -130,6 +132,9 @@ export function obj_write() { const { payload, hash } = generator.genPayload(registry_enabled); const resp = local_client.put(container, headers, payload); if (!resp.success) { + if (resp.abort) { + exec.test.abort(resp.error); + } log.withField("cid", container).error(resp.error); return; } diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index 15999ea..846e075 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -76,6 +76,7 @@ Options (in addition to the common options): * `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section is used. * `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well). * `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load. + * `MAX_TOTAL_SIZE_GB` - if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved. ## HTTP @@ -182,6 +183,7 @@ Note that the `s3local` scenario currently does not support deleters. Options (in addition to the common options): * `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation. + * `MAX_TOTAL_SIZE_GB` - if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved. ## Verify diff --git a/scenarios/s3local.js b/scenarios/s3local.js index d058b33..b04a6a3 100644 --- a/scenarios/s3local.js +++ b/scenarios/s3local.js @@ -6,6 +6,7 @@ import { SharedArray } from 'k6/data'; import { textSummary } from './libs/k6-summary-0.0.2.js'; import { parseEnv } from './libs/env-parser.js'; import { uuidv4 } from './libs/k6-utils-1.4.0.js'; +import exec from 'k6/execution'; parseEnv(); @@ -37,9 +38,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const config_file = __ENV.CONFIG_FILE; const config_dir = __ENV.CONFIG_DIR; +const max_total_size_gb = __ENV.MAX_TOTAL_SIZE_GB ? parseInt(__ENV.MAX_TOTAL_SIZE_GB) : 0; const s3_client = s3local.connect(config_file, config_dir, { 'debug_logger': __ENV.DEBUG_LOGGER || 'false', -}, bucket_mapping()); +}, bucket_mapping(), max_total_size_gb); const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir}); const registry_enabled = !!__ENV.REGISTRY_FILE; @@ -126,6 +128,9 @@ export function obj_write() { const { payload, hash } = generator.genPayload(registry_enabled); const resp = s3_client.put(bucket, key, payload); if (!resp.success) { + if (resp.abort) { + exec.test.abort(resp.error); + } log.withFields({bucket: bucket, key: key}).error(resp.error); return; }