forked from TrueCloudLab/xk6-frostfs
171 lines
6.1 KiB
Go
171 lines
6.1 KiB
Go
package s3local
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
|
"go.k6.io/k6/js/modules"
|
|
"go.k6.io/k6/metrics"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// RootModule is the global module object type. It is instantiated once per test
|
|
// run and will be used to create k6/x/frostfs/s3local module instances for each VU.
|
|
type RootModule struct {
|
|
m *local.RootModule
|
|
}
|
|
|
|
// Local represents an instance of the module for every VU.
|
|
type Local struct {
|
|
l *local.Local
|
|
}
|
|
|
|
// Ensure the interfaces are implemented correctly.
|
|
var (
|
|
_ modules.Module = &RootModule{}
|
|
_ modules.Instance = &Local{}
|
|
|
|
internalObjPutTotal, internalObjPutFails, internalObjPutDuration *metrics.Metric
|
|
internalObjGetTotal, internalObjGetFails, internalObjGetDuration *metrics.Metric
|
|
objPutTotal, objPutFails, objPutDuration *metrics.Metric
|
|
objGetTotal, objGetFails, objGetDuration *metrics.Metric
|
|
)
|
|
|
|
func init() {
|
|
modules.Register("k6/x/frostfs/s3local", &RootModule{
|
|
m: &local.RootModule{},
|
|
})
|
|
}
|
|
|
|
// NewModuleInstance implements the modules.Module interface and returns
|
|
// a new instance for each VU.
|
|
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
|
|
return &Local{local.NewLocalModuleInstance(vu, r.m.GetOrCreateEngine)}
|
|
}
|
|
|
|
// Exports implements the modules.Instance interface and returns the exports
|
|
// of the JS module.
|
|
func (s *Local) Exports() modules.Exports {
|
|
return modules.Exports{Default: s}
|
|
}
|
|
|
|
func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string, maxSizeGB int64) (*Client, error) {
|
|
// Parse configuration flags.
|
|
fs := flag.NewFlagSet("s3local", flag.ContinueOnError)
|
|
|
|
hexKey := fs.String("hex_key", "", "Private key to use as a hexadecimal string. A random one is created if none is provided")
|
|
nodePosition := fs.Int("node_position", 0, "Position of this node in the node array if loading multiple nodes independently")
|
|
nodeCount := fs.Int("node_count", 1, "Number of nodes in the node array if loading multiple nodes independently")
|
|
debugLogger := fs.Bool("debug_logger", false, "Whether to use the development logger instead of the default one for debugging purposes")
|
|
|
|
{
|
|
args := make([]string, 0, len(params))
|
|
for k, v := range params {
|
|
args = append(args, fmt.Sprintf("-%s=%s", k, v))
|
|
}
|
|
if err := fs.Parse(args); err != nil {
|
|
return nil, fmt.Errorf("parsing parameters: %v", err)
|
|
}
|
|
}
|
|
|
|
// Validate and read configuration flags.
|
|
key, err := local.ParseOrCreateKey(*hexKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parsing hex_key: %v", err)
|
|
}
|
|
if *nodeCount <= 0 {
|
|
return nil, fmt.Errorf("node_count must be positive")
|
|
}
|
|
if *nodePosition < 0 || *nodePosition >= *nodeCount {
|
|
return nil, fmt.Errorf("node_position must be in the range [0, node_count-1]")
|
|
}
|
|
|
|
// Register metrics.
|
|
registry := metrics.NewRegistry()
|
|
|
|
internalObjPutTotal, _ = registry.NewMetric("s3local_internal_obj_put_total", metrics.Counter)
|
|
internalObjPutFails, _ = registry.NewMetric("s3local_internal_obj_put_fails", metrics.Counter)
|
|
internalObjPutDuration, _ = registry.NewMetric("s3local_internal_obj_put_duration", metrics.Trend, metrics.Time)
|
|
|
|
internalObjGetTotal, _ = registry.NewMetric("s3local_internal_obj_get_total", metrics.Counter)
|
|
internalObjGetFails, _ = registry.NewMetric("s3local_internal_obj_get_fails", metrics.Counter)
|
|
internalObjGetDuration, _ = registry.NewMetric("s3local_internal_obj_get_duration", metrics.Trend, metrics.Time)
|
|
|
|
objPutTotal, _ = registry.NewMetric("s3local_obj_put_total", metrics.Counter)
|
|
objPutFails, _ = registry.NewMetric("s3local_obj_put_fails", metrics.Counter)
|
|
objPutDuration, _ = registry.NewMetric("s3local_obj_put_duration", metrics.Trend, metrics.Time)
|
|
|
|
objGetTotal, _ = registry.NewMetric("s3local_obj_get_total", metrics.Counter)
|
|
objGetFails, _ = registry.NewMetric("s3local_obj_get_fails", metrics.Counter)
|
|
objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time)
|
|
|
|
// Create S3 layer backed by local storage engine and tree service.
|
|
ng, limiter, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger, maxSizeGB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
|
|
}
|
|
|
|
treeSvc := tree.NewTree(treeServiceEngineWrapper{
|
|
ng: ng,
|
|
pos: *nodePosition,
|
|
size: *nodeCount,
|
|
}, zap.L())
|
|
|
|
rc := rawclient.New(ng,
|
|
rawclient.WithKey(key.PrivateKey),
|
|
rawclient.WithPutHandler(func(sz uint64, err error, dt time.Duration) {
|
|
if err != nil {
|
|
stats.Report(s.l.VU(), internalObjPutFails, 1)
|
|
} else {
|
|
stats.Report(s.l.VU(), internalObjPutTotal, 1)
|
|
stats.Report(s.l.VU(), internalObjPutDuration, metrics.D(dt))
|
|
}
|
|
}),
|
|
rawclient.WithGetHandler(func(sz uint64, err error, dt time.Duration) {
|
|
if err != nil {
|
|
stats.Report(s.l.VU(), internalObjGetFails, 1)
|
|
} else {
|
|
stats.Report(s.l.VU(), internalObjGetTotal, 1)
|
|
stats.Report(s.l.VU(), internalObjGetDuration, metrics.D(dt))
|
|
}
|
|
}),
|
|
)
|
|
|
|
resolver, err := newFixedBucketResolver(bucketMapping)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating bucket resolver: %v", err)
|
|
}
|
|
|
|
cfg := &layer.Config{
|
|
Caches: layer.DefaultCachesConfigs(zap.L()),
|
|
AnonKey: layer.AnonymousKey{Key: key},
|
|
Resolver: resolver,
|
|
TreeService: treeSvc,
|
|
}
|
|
|
|
l := layer.NewLayer(zap.L(), &frostfs{rc}, cfg)
|
|
err = l.Initialize(s.l.VU().Context(), nopEventListener{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initialize: %w", err)
|
|
}
|
|
|
|
return &Client{
|
|
vu: s.l.VU(),
|
|
l: l,
|
|
ownerID: rc.OwnerID(),
|
|
resolver: resolver,
|
|
limiter: limiter,
|
|
}, nil
|
|
}
|
|
|
|
type nopEventListener struct{}
|
|
|
|
func (nopEventListener) Subscribe(context.Context, string, layer.MsgHandler) error { return nil }
|
|
func (nopEventListener) Listen(context.Context) {}
|