forked from TrueCloudLab/xk6-frostfs
[#106] xk6: Allow to set max total size in local scenarios
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
eeededfc18
commit
bc47d66316
8 changed files with 147 additions and 21 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
type Client struct {
|
||||
vu modules.VU
|
||||
rc *rawclient.RawClient
|
||||
l Limiter
|
||||
}
|
||||
|
||||
type (
|
||||
|
@ -25,6 +26,7 @@ type (
|
|||
Success bool
|
||||
ObjectID string
|
||||
Error string
|
||||
Abort bool
|
||||
}
|
||||
|
||||
GetResponse SuccessOrErrorResponse
|
||||
|
@ -32,6 +34,13 @@ type (
|
|||
)
|
||||
|
||||
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
|
||||
if c.l.IsFull() {
|
||||
return PutResponse{
|
||||
Success: false,
|
||||
Error: "engine size limit reached",
|
||||
Abort: true,
|
||||
}
|
||||
}
|
||||
id, err := c.rc.Put(c.vu.Context(), mustParseContainerID(containerID), nil, headers, payload.Bytes())
|
||||
if err != nil {
|
||||
return PutResponse{Error: err.Error()}
|
||||
|
|
89
internal/local/limiter.go
Normal file
89
internal/local/limiter.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package local
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
_ Limiter = &noopLimiter{}
|
||||
_ Limiter = &sizeLimiter{}
|
||||
)
|
||||
|
||||
type Limiter interface {
|
||||
engine.MetricRegister
|
||||
IsFull() bool
|
||||
}
|
||||
|
||||
func NewLimiter(maxSizeGB int64) Limiter {
|
||||
if maxSizeGB < 0 {
|
||||
panic("max size is negative")
|
||||
}
|
||||
if maxSizeGB == 0 {
|
||||
return &noopLimiter{}
|
||||
}
|
||||
return &sizeLimiter{
|
||||
maxSize: maxSizeGB * 1024 * 1024 * 1024,
|
||||
currentSize: &atomic.Int64{},
|
||||
}
|
||||
}
|
||||
|
||||
type sizeLimiter struct {
|
||||
maxSize int64
|
||||
currentSize *atomic.Int64
|
||||
}
|
||||
|
||||
func (*sizeLimiter) AddMethodDuration(method string, d time.Duration) {}
|
||||
func (*sizeLimiter) AddToContainerSize(cnrID string, size int64) {}
|
||||
func (*sizeLimiter) AddToObjectCounter(shardID string, objectType string, delta int) {}
|
||||
func (*sizeLimiter) ClearErrorCounter(shardID string) {}
|
||||
func (*sizeLimiter) DeleteShardMetrics(shardID string) {}
|
||||
func (*sizeLimiter) GC() metrics.GCMetrics { return &noopGCMetrics{} }
|
||||
func (*sizeLimiter) IncErrorCounter(shardID string) {}
|
||||
func (*sizeLimiter) SetMode(shardID string, mode mode.Mode) {}
|
||||
func (*sizeLimiter) SetObjectCounter(shardID string, objectType string, v uint64) {}
|
||||
func (*sizeLimiter) WriteCache() metrics.WriteCacheMetrics { return &noopWriteCacheMetrics{} }
|
||||
|
||||
func (sl *sizeLimiter) AddToPayloadCounter(shardID string, size int64) {
|
||||
sl.currentSize.Add(size)
|
||||
}
|
||||
|
||||
func (sl *sizeLimiter) IsFull() bool {
|
||||
cur := sl.currentSize.Load()
|
||||
return cur > sl.maxSize
|
||||
}
|
||||
|
||||
type noopLimiter struct{}
|
||||
|
||||
func (*noopLimiter) AddMethodDuration(method string, d time.Duration) {}
|
||||
func (*noopLimiter) AddToContainerSize(cnrID string, size int64) {}
|
||||
func (*noopLimiter) AddToObjectCounter(shardID string, objectType string, delta int) {}
|
||||
func (*noopLimiter) AddToPayloadCounter(shardID string, size int64) {}
|
||||
func (*noopLimiter) ClearErrorCounter(shardID string) {}
|
||||
func (*noopLimiter) DeleteShardMetrics(shardID string) {}
|
||||
func (*noopLimiter) GC() metrics.GCMetrics { return &noopGCMetrics{} }
|
||||
func (*noopLimiter) IncErrorCounter(shardID string) {}
|
||||
func (*noopLimiter) SetMode(shardID string, mode mode.Mode) {}
|
||||
func (*noopLimiter) SetObjectCounter(shardID string, objectType string, v uint64) {}
|
||||
func (*noopLimiter) WriteCache() metrics.WriteCacheMetrics { return &noopWriteCacheMetrics{} }
|
||||
func (*noopLimiter) IsFull() bool { return false }
|
||||
|
||||
type noopGCMetrics struct{}
|
||||
|
||||
func (*noopGCMetrics) AddDeletedCount(shardID string, deleted uint64, failed uint64) {}
|
||||
func (*noopGCMetrics) AddExpiredObjectCollectionDuration(string, time.Duration, bool, string) {}
|
||||
func (*noopGCMetrics) AddInhumedObjectCount(shardID string, count uint64, objectType string) {}
|
||||
func (*noopGCMetrics) AddRunDuration(shardID string, d time.Duration, success bool) {}
|
||||
|
||||
type noopWriteCacheMetrics struct{}
|
||||
|
||||
func (*noopWriteCacheMetrics) AddMethodDuration(string, string, bool, time.Duration, string) {}
|
||||
func (*noopWriteCacheMetrics) Close(shardID string) {}
|
||||
func (*noopWriteCacheMetrics) IncOperationCounter(string, string, metrics.NullBool, string) {}
|
||||
func (*noopWriteCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {}
|
||||
func (*noopWriteCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) {}
|
||||
func (*noopWriteCacheMetrics) SetMode(shardID string, mode string) {}
|
|
@ -46,12 +46,13 @@ type RootModule struct {
|
|||
// ng is the engine instance used during one test, corresponding to the configFile. Each VU
|
||||
// gets the same engine instance.
|
||||
ng *engine.StorageEngine
|
||||
l Limiter
|
||||
}
|
||||
|
||||
// Local represents an instance of the module for every VU.
|
||||
type Local struct {
|
||||
vu modules.VU
|
||||
ResolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)
|
||||
ResolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)
|
||||
}
|
||||
|
||||
// Ensure the interfaces are implemented correctly.
|
||||
|
@ -74,7 +75,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
|
|||
return NewLocalModuleInstance(vu, r.GetOrCreateEngine)
|
||||
}
|
||||
|
||||
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)) *Local {
|
||||
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool, int64) (*engine.StorageEngine, Limiter, error)) *Local {
|
||||
return &Local{
|
||||
vu: vu,
|
||||
ResolveEngine: resolveEngine,
|
||||
|
@ -106,47 +107,50 @@ func checkResourceLimits() error {
|
|||
// GetOrCreateEngine returns the current engine instance for the given configuration file or directory,
|
||||
// creating a new one if none exists. Note that the identity of configuration files is their
|
||||
// file name for the purposes of test runs.
|
||||
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool) (*engine.StorageEngine, error) {
|
||||
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool, maxSizeGB int64) (*engine.StorageEngine, Limiter, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if len(configFile) == 0 && len(configDir) == 0 {
|
||||
return nil, errors.New("provide configFile or configDir")
|
||||
return nil, nil, errors.New("provide configFile or configDir")
|
||||
}
|
||||
|
||||
if r.l == nil {
|
||||
r.l = NewLimiter(maxSizeGB)
|
||||
}
|
||||
// Create and initialize engine for the given configFile if it doesn't exist already
|
||||
if r.ng == nil {
|
||||
r.configFile = configFile
|
||||
r.configDir = configDir
|
||||
appCfg := config.New(configFile, configDir, "")
|
||||
ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug)
|
||||
ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug, r.l)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating engine options from config: %v", err)
|
||||
return nil, nil, fmt.Errorf("creating engine options from config: %v", err)
|
||||
}
|
||||
if err := checkResourceLimits(); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
r.ng = engine.New(ngOpts...)
|
||||
for i, opts := range shardOpts {
|
||||
if _, err := r.ng.AddShard(ctx, opts...); err != nil {
|
||||
return nil, fmt.Errorf("adding shard %d: %v", i, err)
|
||||
return nil, nil, fmt.Errorf("adding shard %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
if err := r.ng.Open(ctx); err != nil {
|
||||
return nil, fmt.Errorf("opening engine: %v", err)
|
||||
return nil, nil, fmt.Errorf("opening engine: %v", err)
|
||||
}
|
||||
if err := r.ng.Init(ctx); err != nil {
|
||||
return nil, fmt.Errorf("initializing engine: %v", err)
|
||||
return nil, nil, fmt.Errorf("initializing engine: %v", err)
|
||||
}
|
||||
} else if configFile != r.configFile {
|
||||
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+
|
||||
return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+
|
||||
"initialized: got %q, want %q", configFile, r.configFile)
|
||||
} else if configDir != r.configDir {
|
||||
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+
|
||||
return nil, nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+
|
||||
"initialized: got %q, want %q", configDir, r.configDir)
|
||||
}
|
||||
|
||||
return r.ng, nil
|
||||
return r.ng, r.l, nil
|
||||
}
|
||||
|
||||
// Exports implements the modules.Instance interface and returns the exports
|
||||
|
@ -157,8 +161,8 @@ func (s *Local) Exports() modules.Exports {
|
|||
|
||||
func (s *Local) VU() modules.VU { return s.vu }
|
||||
|
||||
func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Client, error) {
|
||||
ng, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug)
|
||||
func (s *Local) Connect(configFile, configDir, hexKey string, debug bool, maxSizeGB int64) (*Client, error) {
|
||||
ng, l, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug, maxSizeGB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
|
||||
}
|
||||
|
@ -212,7 +216,7 @@ func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Clie
|
|||
}
|
||||
}),
|
||||
)
|
||||
return &Client{vu: s.vu, rc: rc}, nil
|
||||
return &Client{vu: s.vu, rc: rc, l: l}, nil
|
||||
}
|
||||
|
||||
type epochState struct{}
|
||||
|
@ -225,7 +229,7 @@ func (epochState) CurrentEpoch() uint64 { return 0 }
|
|||
// preloaded the storage (if any), by using the same configuration file.
|
||||
//
|
||||
// Note that the configuration file only needs to contain the storage-specific sections.
|
||||
func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Option, [][]shard.Option, error) {
|
||||
func storageEngineOptionsFromConfig(c *config.Config, debug bool, l Limiter) ([]engine.Option, [][]shard.Option, error) {
|
||||
log := zap.L()
|
||||
if debug {
|
||||
var err error
|
||||
|
@ -239,6 +243,7 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti
|
|||
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c)),
|
||||
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c)),
|
||||
engine.WithLogger(&logger.Logger{Logger: log}),
|
||||
engine.WithMetrics(l),
|
||||
}
|
||||
|
||||
var shOpts [][]shard.Option
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||
"github.com/dop251/goja"
|
||||
"go.k6.io/k6/js/modules"
|
||||
|
@ -18,11 +19,13 @@ type Client struct {
|
|||
l layer.Client
|
||||
ownerID *user.ID
|
||||
resolver layer.BucketResolver
|
||||
limiter local.Limiter
|
||||
}
|
||||
|
||||
type (
|
||||
SuccessOrErrorResponse struct {
|
||||
Success bool
|
||||
Abort bool
|
||||
Error string
|
||||
}
|
||||
|
||||
|
@ -33,6 +36,13 @@ type (
|
|||
)
|
||||
|
||||
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||
if c.limiter.IsFull() {
|
||||
return PutResponse{
|
||||
Success: false,
|
||||
Abort: true,
|
||||
Error: "engine size limit reached",
|
||||
}
|
||||
}
|
||||
cid, err := c.resolver.Resolve(c.vu.Context(), bucket)
|
||||
if err != nil {
|
||||
stats.Report(c.vu, objPutFails, 1)
|
||||
|
|
|
@ -56,7 +56,7 @@ func (s *Local) Exports() modules.Exports {
|
|||
return modules.Exports{Default: s}
|
||||
}
|
||||
|
||||
func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string) (*Client, error) {
|
||||
func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string, maxSizeGB int64) (*Client, error) {
|
||||
// Parse configuration flags.
|
||||
fs := flag.NewFlagSet("s3local", flag.ContinueOnError)
|
||||
|
||||
|
@ -107,7 +107,7 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s
|
|||
objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time)
|
||||
|
||||
// Create S3 layer backed by local storage engine and tree service.
|
||||
ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger)
|
||||
ng, limiter, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger, maxSizeGB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
|
||||
}
|
||||
|
@ -158,6 +158,7 @@ func (s *Local) Connect(configFile string, configDir string, params map[string]s
|
|||
l: l,
|
||||
ownerID: rc.OwnerID(),
|
||||
resolver: resolver,
|
||||
limiter: limiter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import { SharedArray } from 'k6/data';
|
|||
import { textSummary } from './libs/k6-summary-0.0.2.js';
|
||||
import { parseEnv } from './libs/env-parser.js';
|
||||
import { uuidv4 } from './libs/k6-utils-1.4.0.js';
|
||||
import exec from 'k6/execution';
|
||||
|
||||
parseEnv();
|
||||
|
||||
|
@ -23,7 +24,8 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
const config_file = __ENV.CONFIG_FILE;
|
||||
const config_dir = __ENV.CONFIG_DIR;
|
||||
const debug_logger = (__ENV.DEBUG_LOGGER || 'false') == 'true';
|
||||
const local_client = local.connect(config_file, config_dir, '', debug_logger);
|
||||
const max_total_size_gb = __ENV.MAX_TOTAL_SIZE_GB ? parseInt(__ENV.MAX_TOTAL_SIZE_GB) : 0;
|
||||
const local_client = local.connect(config_file, config_dir, '', debug_logger, max_total_size_gb);
|
||||
const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
@ -130,6 +132,9 @@ export function obj_write() {
|
|||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const resp = local_client.put(container, headers, payload);
|
||||
if (!resp.success) {
|
||||
if (resp.abort) {
|
||||
exec.test.abort(resp.error);
|
||||
}
|
||||
log.withField("cid", container).error(resp.error);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@ Options (in addition to the common options):
|
|||
* `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section is used.
|
||||
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
|
||||
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
|
||||
* `MAX_TOTAL_SIZE_GB` - if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved.
|
||||
|
||||
## HTTP
|
||||
|
||||
|
@ -182,6 +183,7 @@ Note that the `s3local` scenario currently does not support deleters.
|
|||
|
||||
Options (in addition to the common options):
|
||||
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
|
||||
* `MAX_TOTAL_SIZE_GB` - if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved.
|
||||
|
||||
## Verify
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import { SharedArray } from 'k6/data';
|
|||
import { textSummary } from './libs/k6-summary-0.0.2.js';
|
||||
import { parseEnv } from './libs/env-parser.js';
|
||||
import { uuidv4 } from './libs/k6-utils-1.4.0.js';
|
||||
import exec from 'k6/execution';
|
||||
|
||||
parseEnv();
|
||||
|
||||
|
@ -37,9 +38,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
|
||||
const config_file = __ENV.CONFIG_FILE;
|
||||
const config_dir = __ENV.CONFIG_DIR;
|
||||
const max_total_size_gb = __ENV.MAX_TOTAL_SIZE_GB ? parseInt(__ENV.MAX_TOTAL_SIZE_GB) : 0;
|
||||
const s3_client = s3local.connect(config_file, config_dir, {
|
||||
'debug_logger': __ENV.DEBUG_LOGGER || 'false',
|
||||
}, bucket_mapping());
|
||||
}, bucket_mapping(), max_total_size_gb);
|
||||
const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
@ -126,6 +128,9 @@ export function obj_write() {
|
|||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
const resp = s3_client.put(bucket, key, payload);
|
||||
if (!resp.success) {
|
||||
if (resp.abort) {
|
||||
exec.test.abort(resp.error);
|
||||
}
|
||||
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue