From 1cf53545f29707cf6da410f4c99bc0c3c34dcd6b Mon Sep 17 00:00:00 2001 From: Vladimir Domnich Date: Fri, 2 Sep 2022 23:23:33 +0400 Subject: [PATCH] [#19] Implement objects registry module Registry module stores information about uploaded objects in bolt database and allows to verify their validity after a load test. Also, implemented logic to verify objects uploaded via gRPC and S3 protocols. Signed-off-by: Vladimir Domnich --- .gitignore | 1 + README.md | 2 +- go.mod | 1 + go.sum | 1 + internal/native/client.go | 183 ++++++++++++------ internal/registry/obj_selector.go | 72 +++++++ internal/registry/registry.go | 150 ++++++++++++++ internal/s3/client.go | 60 +++++- internal/s3/s3.go | 1 - neofs.go | 1 + scenarios/grpc.js | 103 +++++----- scenarios/preset/preset_grpc.py | 6 +- .../{run_scenarions.md => run_scenarios.md} | 30 ++- scenarios/s3.js | 115 ++++++----- scenarios/verify.js | 79 ++++++++ 15 files changed, 617 insertions(+), 188 deletions(-) create mode 100644 internal/registry/obj_selector.go create mode 100644 internal/registry/registry.go rename scenarios/{run_scenarions.md => run_scenarios.md} (53%) create mode 100644 scenarios/verify.js diff --git a/.gitignore b/.gitignore index 5ed2f93..c388ce3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ k6 +*.bolt diff --git a/README.md b/README.md index a683004..c38c751 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ const neofs_cli = native.connect("s01.neofs.devenv:8080", "") flag, and `error` string. - `onsite(container_id, payload)`. Returns NeoFS object instance with prepared headers. Invoke `put(headers)` method on this object to upload it into NeoFS. - It returns dicrionary with `success` boolean flag, `object_id` string and + It returns dictionary with `success` boolean flag, `object_id` string and `error` string. ## S3 diff --git a/go.mod b/go.mod index 2d45414..c9e3833 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/sirupsen/logrus v1.8.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.1.2 // indirect + go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect diff --git a/go.sum b/go.sum index 16ebeb4..58d2ff8 100644 --- a/go.sum +++ b/go.sum @@ -358,6 +358,7 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/yuin/gopher-lua v0.0.0-20191128022950-c6266f4fe8d7/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.k6.io/k6 v0.38.2 h1:v4Dr7KhZVf+s6V6oz/pGtQ9ejTfNgUPcd/D3RH3GVdY= go.k6.io/k6 v0.38.2/go.mod h1:1bTdDsXTT2V3in3ZgdR15MDW6SQQh5nWni59tirqNB8= diff --git a/internal/native/client.go b/internal/native/client.go index dec5912..ee5f34a 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "crypto/sha256" "encoding/binary" + "encoding/hex" "errors" "fmt" "strconv" @@ -52,6 +53,11 @@ type ( Error string } + VerifyHashResponse struct { + Success bool + Error string + } + PutContainerResponse struct { Success bool ContainerID string @@ -82,20 +88,16 @@ func (c *Client) SetBufferSize(size int) { } } -func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { - var containerID cid.ID - err := containerID.DecodeString(inputContainerID) - if err != nil { - panic(err) - } +func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { + cliContainerID := parseContainerID(containerID) var addr address.Address - addr.SetContainerID(containerID) + addr.SetContainerID(cliContainerID) tok := c.tok tok.ForVerb(session.VerbObjectPut) tok.ApplyTo(addr) - err = tok.Sign(c.key) + err := tok.Sign(c.key) if err != nil { panic(err) } @@ -112,7 +114,7 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload } var o object.Object - o.SetContainerID(containerID) + o.SetContainerID(cliContainerID) o.SetOwnerID(&owner) o.SetAttributes(attrs...) @@ -127,32 +129,18 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload return PutResponse{Success: true, ObjectID: id.String()} } -func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { - var ( - buf = make([]byte, c.bufsize) - sz int - ) - - var containerID cid.ID - err := containerID.DecodeString(inputContainerID) - if err != nil { - panic(err) - } - - var objectID oid.ID - err = objectID.DecodeString(inputObjectID) - if err != nil { - panic(err) - } +func (c *Client) Get(containerID, objectID string) GetResponse { + cliContainerID := parseContainerID(containerID) + cliObjectID := parseObjectID(objectID) var addr address.Address - addr.SetContainerID(containerID) - addr.SetObjectID(objectID) + addr.SetContainerID(cliContainerID) + addr.SetObjectID(cliObjectID) tok := c.tok tok.ForVerb(session.VerbObjectGet) tok.ApplyTo(addr) - err = tok.Sign(c.key) + err := tok.Sign(c.key) if err != nil { panic(err) } @@ -160,44 +148,97 @@ func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { stats.Report(c.vu, objGetTotal, 1) start := time.Now() - var prmObjectGetInit client.PrmObjectGet - prmObjectGetInit.ByID(objectID) - prmObjectGetInit.FromContainer(containerID) - prmObjectGetInit.WithinSession(tok) + var prm client.PrmObjectGet + prm.ByID(cliObjectID) + prm.FromContainer(cliContainerID) + prm.WithinSession(tok) - objectReader, err := c.cli.ObjectGetInit(c.vu.Context(), prmObjectGetInit) - if err != nil { - stats.Report(c.vu, objGetFails, 1) - return GetResponse{Success: false, Error: err.Error()} - } - - var o object.Object - if !objectReader.ReadHeader(&o) { - stats.Report(c.vu, objGetFails, 1) - var errorStr string - if _, err = objectReader.Close(); err != nil { - errorStr = err.Error() - } - return GetResponse{Success: false, Error: errorStr} - } - - n, _ := objectReader.Read(buf) - for n > 0 { - sz += n - n, _ = objectReader.Read(buf) - } - - _, err = objectReader.Close() + var objSize = 0 + err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { + objSize += len(data) + }) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Success: false, Error: err.Error()} } stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) - stats.ReportDataReceived(c.vu, float64(sz)) + stats.ReportDataReceived(c.vu, float64(objSize)) return GetResponse{Success: true} } +func get( + cli *client.Client, + prm client.PrmObjectGet, + ctx context.Context, + bufSize int, + onDataChunk func(chunk []byte), +) error { + var buf = make([]byte, bufSize) + + objectReader, err := cli.ObjectGetInit(ctx, prm) + if err != nil { + return err + } + + var o object.Object + if !objectReader.ReadHeader(&o) { + if _, err = objectReader.Close(); err != nil { + return err + } + return errors.New("can't read object header") + } + + n, _ := objectReader.Read(buf) + for n > 0 { + onDataChunk(buf[:n]) + n, _ = objectReader.Read(buf) + } + + _, err = objectReader.Close() + if err != nil { + return err + } + + return nil +} + +func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHashResponse { + cliContainerID := parseContainerID(containerID) + cliObjectID := parseObjectID(objectID) + + var addr address.Address + addr.SetContainerID(cliContainerID) + addr.SetObjectID(cliObjectID) + + tok := c.tok + tok.ForVerb(session.VerbObjectGet) + tok.ApplyTo(addr) + err := tok.Sign(c.key) + if err != nil { + panic(err) + } + + var prm client.PrmObjectGet + prm.ByID(cliObjectID) + prm.FromContainer(cliContainerID) + prm.WithinSession(tok) + + hasher := sha256.New() + err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { + hasher.Write(data) + }) + if err != nil { + return VerifyHashResponse{Success: false, Error: err.Error()} + } + actualHash := hex.EncodeToString(hasher.Sum(make([]byte, 0, sha256.Size))) + if actualHash != expectedHash { + return VerifyHashResponse{Success: true, Error: "hash mismatch"} + } + + return VerifyHashResponse{Success: true} +} + func (c *Client) putCnrErrorResponse(err error) PutContainerResponse { stats.Report(c.vu, cnrPutFails, 1) return PutContainerResponse{Success: false, Error: err.Error()} @@ -270,7 +311,7 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse { return PutContainerResponse{Success: true, ContainerID: res.ID().EncodeToString()} } -func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) PreparedObject { +func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject { maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) if err != nil { panic(err) @@ -286,11 +327,7 @@ func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) Prepa panic(msg) } - var containerID cid.ID - err = containerID.DecodeString(inputContainerID) - if err != nil { - panic(err) - } + cliContainerID := parseContainerID(containerID) var owner user.ID user.IDFromKey(&owner, c.key.PublicKey) @@ -300,7 +337,7 @@ func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) Prepa obj := object.New() obj.SetVersion(&apiVersion) obj.SetType(object.TypeRegular) - obj.SetContainerID(containerID) + obj.SetContainerID(cliContainerID) obj.SetOwnerID(&owner) obj.SetPayloadSize(uint64(ln)) obj.SetCreationEpoch(epoch) @@ -471,3 +508,21 @@ func waitFor(ctx context.Context, params *waitParams, condition func(context.Con } } } + +func parseContainerID(strContainerID string) cid.ID { + var containerID cid.ID + err := containerID.DecodeString(strContainerID) + if err != nil { + panic(err) + } + return containerID +} + +func parseObjectID(strObjectID string) oid.ID { + var cliObjectID oid.ID + err := cliObjectID.DecodeString(strObjectID) + if err != nil { + panic(err) + } + return cliObjectID +} diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go new file mode 100644 index 0000000..613a75e --- /dev/null +++ b/internal/registry/obj_selector.go @@ -0,0 +1,72 @@ +package registry + +import ( + "encoding/json" + "sync" + + "go.etcd.io/bbolt" +) + +type ObjSelector struct { + boltDB *bbolt.DB + mu sync.Mutex + lastId uint64 + objStatus string +} + +func (o *ObjSelector) NextObject() (*ObjectInfo, error) { + var foundObj *ObjectInfo + err := o.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + if b == nil { + return nil + } + + c := b.Cursor() + + // We use mutex so that multiple VUs won't attempt to modify lastId simultaneously + // TODO: consider singleton channel that will produce those ids on demand + o.mu.Lock() + defer o.mu.Unlock() + + // Establish the start position for searching the next object: + // If we should go from the beginning (lastId=0), then we start from the first element + // Otherwise we start from the key right after the lastId + var keyBytes, objBytes []byte + if o.lastId == 0 { + keyBytes, objBytes = c.First() + } else { + c.Seek(encodeId(o.lastId)) + keyBytes, objBytes = c.Next() + } + + // Iterate over objects to find the next object in the target status + var obj ObjectInfo + for ; keyBytes != nil; keyBytes, objBytes = c.Next() { + if objBytes != nil { + if err := json.Unmarshal(objBytes, &obj); err != nil { + // Ignore malformed objects for now. Maybe it should be panic? + continue + } + // If we reached an object in the target status, stop iterating + if obj.Status == o.objStatus { + foundObj = &obj + break + } + } + } + + // Update the last key + if keyBytes != nil { + o.lastId = decodeId(keyBytes) + } else { + // Loopback to beginning so that we can revisit objects which were taken for verification + // but their status wasn't changed + // TODO: stop looping back to beginning too quickly + o.lastId = 0 + } + + return nil + }) + return foundObj, err +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go new file mode 100644 index 0000000..d6a7ffd --- /dev/null +++ b/internal/registry/registry.go @@ -0,0 +1,150 @@ +package registry + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "os" + "time" + + "go.etcd.io/bbolt" + "go.k6.io/k6/js/modules" +) + +// RootModule is the global module object type. It is instantiated once per test +// run and will be used to create k6/x/neofs/registry module instances for each VU. +type RootModule struct { + boltDB *bbolt.DB + objSelector *ObjSelector +} + +// Registry represents an instance of the module for every VU. +type Registry struct { + vu modules.VU + root *RootModule +} + +const ( + // Indicates that an object was created, but it's data wasn't verified yet + statusCreated = "created" +) + +const bucketName = "_object" + +// ObjectInfo represents information about neoFS object that has been created +// via gRPC/HTTP/S3 API. +type ObjectInfo struct { + Id uint64 // Identifier in bolt DB + CID string // Container ID in gRPC/HTTP + OID string // Object ID in gRPC/HTTP + S3Bucket string // Bucket name in S3 + S3Key string // Object key in S3 + Status string // Status of the object + PayloadHash string // SHA256 hash of object payload that can be used for verification +} + +// Ensure the interfaces are implemented correctly. +var ( + _ modules.Instance = &Registry{} + _ modules.Module = &RootModule{} +) + +func init() { + // TODO: research on a way to use configurable database name + // TODO: research on a way to close DB properly (maybe in teardown) + options := bbolt.Options{Timeout: 100 * time.Millisecond} + boltDB, err := bbolt.Open("registry.bolt", os.ModePerm, &options) + if err != nil { + fmt.Println(err) + return + } + + // Selector that searches objects for verification + objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated} + + rootModule := &RootModule{boltDB: boltDB, objSelector: &objSelector} + modules.Register("k6/x/neofs/registry", rootModule) +} + +// NewModuleInstance implements the modules.Module interface and returns +// a new instance for each VU. +func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + mi := &Registry{vu: vu, root: r} + return mi +} + +// Exports implements the modules.Instance interface and returns the exports +// of the JS module. +func (r *Registry) Exports() modules.Exports { + return modules.Exports{Default: r} +} + +func (r *Registry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error { + return r.root.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + + id, err := b.NextSequence() + if err != nil { + return err + } + + object := ObjectInfo{ + Id: id, + CID: cid, + OID: oid, + S3Bucket: s3Bucket, + S3Key: s3Key, + PayloadHash: payloadHash, + Status: statusCreated, + } + objectJson, err := json.Marshal(object) + if err != nil { + return err + } + + return b.Put(encodeId(id), objectJson) + }) +} + +func (r *Registry) SetObjectStatus(id uint64, newStatus string) error { + return r.root.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + + objBytes := b.Get(encodeId(id)) + if objBytes == nil { + return nil + } + + obj := new(ObjectInfo) + if err := json.Unmarshal(objBytes, &obj); err != nil { + return err + } + obj.Status = newStatus + + objBytes, err = json.Marshal(obj) + if err != nil { + return err + } + return b.Put(encodeId(id), objBytes) + }) +} + +func (r *Registry) NextObjectToVerify() (*ObjectInfo, error) { + return r.root.objSelector.NextObject() +} + +func encodeId(id uint64) []byte { + idBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idBytes, id) + return idBytes +} + +func decodeId(idBytes []byte) uint64 { + return binary.BigEndian.Uint64(idBytes) +} diff --git a/internal/s3/client.go b/internal/s3/client.go index 19803ab..3c1ec33 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -3,6 +3,8 @@ package s3 import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "strconv" "time" @@ -35,6 +37,11 @@ type ( Success bool Error string } + + VerifyHashResponse struct { + Success bool + Error string + } ) func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { @@ -60,32 +67,65 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { } func (c *Client) Get(bucket, key string) GetResponse { - var ( - buf = make([]byte, 4*1024) - sz int - ) stats.Report(c.vu, objGetTotal, 1) start := time.Now() - obj, err := c.cli.GetObject(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), + + var objSize = 0 + err := get(c.cli, bucket, key, func(chunk []byte) { + objSize += len(chunk) }) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) + stats.ReportDataReceived(c.vu, float64(objSize)) + return GetResponse{Success: true} +} + +func get( + c *s3.Client, + bucket string, + key string, + onDataChunk func(chunk []byte), +) error { + var buf = make([]byte, 4*1024) + + obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return err + } + for { n, err := obj.Body.Read(buf) if n > 0 { - sz += n + onDataChunk(buf[:n]) } if err != nil { break } } - stats.ReportDataReceived(c.vu, float64(sz)) - return GetResponse{Success: true} + return nil +} + +func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse { + hasher := sha256.New() + err := get(c.cli, bucket, key, func(data []byte) { + hasher.Write(data) + }) + if err != nil { + return VerifyHashResponse{Success: false, Error: err.Error()} + } + actualHash := hex.EncodeToString(hasher.Sum(make([]byte, 0, sha256.Size))) + if actualHash != expectedHash { + return VerifyHashResponse{Success: true, Error: "hash mismatch"} + } + + return VerifyHashResponse{Success: true} } func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse { diff --git a/internal/s3/s3.go b/internal/s3/s3.go index 1bd36ee..fc4fefd 100644 --- a/internal/s3/s3.go +++ b/internal/s3/s3.go @@ -31,7 +31,6 @@ var ( func init() { modules.Register("k6/x/neofs/s3", new(RootModule)) - } // NewModuleInstance implements the modules.Module interface and returns diff --git a/neofs.go b/neofs.go index 59af1d3..d176fdb 100644 --- a/neofs.go +++ b/neofs.go @@ -3,6 +3,7 @@ package xk6_neofs import ( _ "github.com/nspcc-dev/xk6-neofs/internal/datagen" _ "github.com/nspcc-dev/xk6-neofs/internal/native" + _ "github.com/nspcc-dev/xk6-neofs/internal/registry" _ "github.com/nspcc-dev/xk6-neofs/internal/s3" "go.k6.io/k6/js/modules" ) diff --git a/scenarios/grpc.js b/scenarios/grpc.js index 3acb2d4..be076b1 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -1,54 +1,61 @@ import datagen from 'k6/x/neofs/datagen'; import native from 'k6/x/neofs/native'; +import registry from 'k6/x/neofs/registry'; import { SharedArray } from 'k6/data'; import { sleep } from 'k6'; const obj_list = new SharedArray('obj_list', function () { - return JSON.parse(open(__ENV.PREGEN_JSON)).objects; }); + return JSON.parse(open(__ENV.PREGEN_JSON)).objects; +}); const container_list = new SharedArray('container_list', function () { - return JSON.parse(open(__ENV.PREGEN_JSON)).containers; }); + return JSON.parse(open(__ENV.PREGEN_JSON)).containers; +}); const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; /* - ./k6 run -e PROFILE=0:60 -e WRITE_OBJ_SIZE=1024 -e CLIENTS=200 -e NODES=node4.data:8084 -e PREGEN_JSON=test.json scenarios/grpc.js + ./k6 run -e PROFILE=0:60 -e CLIENTS=200 -e WRITE_OBJ_SIZE=1024 \ + -e GRPC_ENDPOINTS=host1:8080,host2:8080 \ + -e PREGEN_JSON=test.json \ + scenarios/grpc.js - Parse profile from env. - Format write:obj_size: - * write - write operations in percent, relative to read operations - * duration - duration in seconds + REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. */ +// Parse profile from env (format is write:duration) +// * write - percent of VUs performing write operations (the rest will be read VUs) +// * duration - duration in seconds const [ write, duration ] = __ENV.PROFILE.split(':'); -// Set VUs between write and read operations -let vus_read = Math.ceil(__ENV.CLIENTS/100*(100-parseInt(write))) -let vus_write = __ENV.CLIENTS - vus_read +// Allocate VUs between write and read operations +const read_vu_count = Math.ceil(__ENV.CLIENTS / 100 * (100 - parseInt(write))); +const write_vu_count = __ENV.CLIENTS - read_vu_count; +// Select random gRPC endpoint for current VU +const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); +const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; +const grpc_client = native.connect(grpc_endpoint, ''); + +const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled" const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); -let nodes = __ENV.NODES.split(',') -let rand_node = nodes[Math.floor(Math.random()*nodes.length)]; +const scenarios = {}; -const neofs_cli = native.connect(rand_node, "") - -let scenarios = {} - -if (vus_write > 0){ - scenarios.write= { +if (write_vu_count > 0) { + scenarios.write = { executor: 'constant-vus', - vus: vus_write, + vus: write_vu_count, duration: `${duration}s`, exec: 'obj_write', gracefulStop: '5s', } } -if (vus_read > 0){ - scenarios.read= { +if (read_vu_count > 0) { + scenarios.read = { executor: 'constant-vus', - vus: vus_read, + vus: read_vu_count, duration: `${duration}s`, exec: 'obj_read', gracefulStop: '5s', @@ -56,47 +63,53 @@ if (vus_read > 0){ } export function setup() { - console.log("Pregenerated containers: " + container_list.length) - console.log("Pregenerated read object size: " + read_size) - console.log("Pregenerated total objects: " + obj_list.length) + console.log("Pregenerated containers: " + container_list.length); + console.log("Pregenerated read object size: " + read_size); + console.log("Pregenerated total objects: " + obj_list.length); } export const options = { - scenarios: scenarios, + scenarios, setupTimeout: '5s', }; export function obj_write() { - let headers = { - 'unique_header': uuidv4() - } - let container = container_list[Math.floor(Math.random()*container_list.length)]; - - const { payload } = generator.genPayload(false); - let resp = neofs_cli.put( container, headers, payload); - if (!resp.success) { - console.log(resp.error); - } if (__ENV.SLEEP) { sleep(__ENV.SLEEP); } - + + const headers = { + unique_header: uuidv4() + }; + const container = container_list[Math.floor(Math.random() * container_list.length)]; + + const { payload, hash } = generator.genPayload(registry_enabled); + let resp = grpc_client.put(container, headers, payload); + if (!resp.success) { + console.log(resp.error); + return; + } + + if (registry_enabled) { + registry.addObject(container, resp.object_id, "", "", hash); + } } export function obj_read() { - let random_read_obj = obj_list[Math.floor(Math.random()*obj_list.length)]; - let resp = neofs_cli.get(random_read_obj.container, random_read_obj.object) - if (!resp.success) { - console.log(resp.error); - } if (__ENV.SLEEP) { sleep(__ENV.SLEEP); } + + let obj = obj_list[Math.floor(Math.random() * obj_list.length)]; + let resp = grpc_client.get(obj.container, obj.object) + if (!resp.success) { + console.log(resp.error); + } } export function uuidv4() { return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { - let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); - return v.toString(16); + let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); }); - } +} diff --git a/scenarios/preset/preset_grpc.py b/scenarios/preset/preset_grpc.py index 6e9cbc2..bc3f119 100755 --- a/scenarios/preset/preset_grpc.py +++ b/scenarios/preset/preset_grpc.py @@ -94,15 +94,15 @@ def execute_cmd(cmd_line): def create_container(): cmd_line = f"neofs-cli --rpc-endpoint {args.endpoint} container create -g --policy 'REP 1 IN X CBF 1 SELECT 1 FROM * AS X' --basic-acl public-read-write --await" - ouptut, success = execute_cmd(cmd_line) + output, success = execute_cmd(cmd_line) if not success: print(f" > Container has not been created.") else: try: - fst_str = ouptut.split('\n')[0] + fst_str = output.split('\n')[0] except Exception: - print(f"Got empty output: {ouptut}") + print(f"Got empty output: {output}") splitted = fst_str.split(": ") if len(splitted) != 2: raise ValueError(f"no CID was parsed from command output: \t{fst_str}") diff --git a/scenarios/run_scenarions.md b/scenarios/run_scenarios.md similarity index 53% rename from scenarios/run_scenarions.md rename to scenarios/run_scenarios.md index 9eef655..3dcdd8d 100644 --- a/scenarios/run_scenarions.md +++ b/scenarios/run_scenarios.md @@ -15,20 +15,21 @@ The tests will use all pre-created containers for PUT operations and all pre-cre 2. Execute scenario with options: ```shell -$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e NODES=node1.data:8080,node4.data:8080 -e PREGEN_JSON=./grpc.json scenarios/grpc.js +$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e GRPC_ENDPOINTS=node1.data:8080,node4.data:8080 -e PREGEN_JSON=./grpc.json scenarios/grpc.js ``` Options: - * PROFILE - format write:obj_size:duration - * write - write operations in percent, relative to read operations + * PROFILE - format write:duration + * write - percent of VUs performing write operations (the rest will be read VUs) * duration - time in sec * CLIENTS - number of VUs for all operations * WRITE_OBJ_SIZE - object size in kb for write(PUT) operations * PREGEN_JSON - path to json file with pre-generated containers and objects + * REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. ## S3 -1. Create s3 credential: +1. Create s3 credentials: ```shell $ neofs-s3-authmate issue-secret --wallet wallet.json --peer node1.intra:8080 --gate-public-key 03d33a2cc7b8daaa5a3df3fccf065f7cf1fc6a3279efc161fcec512dcc0c1b2277 --gate-public-key 03ff0ad212e10683234442530bfd71d0bb18c3fbd6459aba768eacf158b0c359a2 --gate-public-key 033ae03ff30ed3b6665af69955562cfc0eae18d50e798ab31f054ee22e32fee993 --gate-public-key 02127c7498de0765d2461577c9d4f13f916eefd1884896183e6de0d9a85d17f2fb --bearer-rules rules.json --container-placement-policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" @@ -55,6 +56,25 @@ The tests will use all pre-created buckets for PUT operations and all pre-create 3. Execute scenario with options: ```shell -$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e NODES=node1.data:8084,node4.data:8084 -e PREGEN_JSON=s3.json scenarios/s3.js +$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e S3_ENDPOINTS=node1.data:8084,node4.data:8084 -e PREGEN_JSON=s3.json scenarios/s3.js +``` + +Options are identical to gRPC scenario, plus: + * OBJ_NAME - if specified, this name will be used for all write operations instead of random generation. + +## Verify + +This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY=enabled`. + +To verify stored objects execute scenario with options: ``` +./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 scenarios/verify.js +``` + +Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`. +Scenario ends as soon as all objects are checked (return code will be [108](https://k6.io/docs/javascript-api/k6-execution/#test)). + +Options: + * CLIENTS - number of VUs for verifying objects (VU can handle both GRPC and S3 objects) + * TIME_LIMIT - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state. diff --git a/scenarios/s3.js b/scenarios/s3.js index 0d15f9f..6370cd0 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -1,115 +1,112 @@ import datagen from 'k6/x/neofs/datagen'; import s3 from 'k6/x/neofs/s3'; +import registry from 'k6/x/neofs/registry'; import { SharedArray } from 'k6/data'; import { sleep } from 'k6'; const obj_list = new SharedArray('obj_list', function () { - return JSON.parse(open(__ENV.PREGEN_JSON)).objects; }); + return JSON.parse(open(__ENV.PREGEN_JSON)).objects; +}); const bucket_list = new SharedArray('bucket_list', function () { - return JSON.parse(open(__ENV.PREGEN_JSON)).buckets; }); + return JSON.parse(open(__ENV.PREGEN_JSON)).buckets; +}); const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; /* - ./k6 run -e PROFILE=0:60 -e WRITE_OBJ_SIZE=1024 -e CLIENTS=200 -e NODES=node4.data:8084 -e PREGEN_JSON=test.json scenarios/s3_t.js + ./k6 run -e PROFILE=0:60 -e CLIENTS=200 -e WRITE_OBJ_SIZE=1024 \ + -e S3_ENDPOINTS=host1:8084,host2:8084 -e PREGEN_JSON=test.json \ + scenarios/s3.js - Parse profile from env. - Format write:obj_size: - * write - write operations in percent, relative to read operations - * duration - duration in seconds - - OBJ_NAME - this name will be used for all write operations instead of randow generation in case of declared. + OBJ_NAME - if specified, this name will be used for all write operations instead of random generation. + REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. */ +// Parse profile from env const [ write, duration ] = __ENV.PROFILE.split(':'); -// Set VUs between write and read operations -let vus_read = Math.ceil(__ENV.CLIENTS/100*(100-parseInt(write))) -let vus_write = __ENV.CLIENTS - vus_read +// Allocate VUs between write and read operations +let read_vu_count = Math.ceil(__ENV.CLIENTS / 100 * (100 - parseInt(write))); +let write_vu_count = __ENV.CLIENTS - read_vu_count; +// Select random S3 endpoint for current VU +const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); +const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; +const s3_client = s3.connect(`http://${s3_endpoint}`); + +const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled" const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); -let nodes = __ENV.NODES.split(',') -let rand_node = nodes[Math.floor(Math.random()*nodes.length)]; +const scenarios = {}; -let s3_cli = s3.connect(`http://${rand_node}`) - -let scenarios = {} - -if (vus_write > 0){ - scenarios.write= { +if (write_vu_count > 0){ + scenarios.write = { executor: 'constant-vus', - vus: vus_write, + vus: write_vu_count, duration: `${duration}s`, exec: 'obj_write', gracefulStop: '5s', - } + }; } -if (vus_read > 0){ - scenarios.read= { +if (read_vu_count > 0){ + scenarios.read = { executor: 'constant-vus', - vus: vus_read, + vus: read_vu_count, duration: `${duration}s`, exec: 'obj_read', gracefulStop: '5s', - } + }; } export function setup() { - console.log("Pregenerated buckets: " + bucket_list.length) - console.log("Pregenerated read object size: " + read_size) - console.log("Pregenerated total objects: " + obj_list.length) + console.log("Pregenerated buckets: " + bucket_list.length); + console.log("Pregenerated read object size: " + read_size); + console.log("Pregenerated total objects: " + obj_list.length); } export const options = { - scenarios: scenarios, + scenarios, setupTimeout: '5s', }; export function obj_write() { - let key = ""; - if (__ENV.OBJ_NAME){ - key = __ENV.OBJ_NAME; - } - else{ - key = uuidv4(); - } - - - let bucket = bucket_list[Math.floor(Math.random()*bucket_list.length)]; - - const { payload } = generator.genPayload(false); - let resp = s3_cli.put(bucket, key, payload) - - if (!resp.success) { - console.log(resp.error); - } - if (__ENV.SLEEP) { sleep(__ENV.SLEEP); } - + + const key = __ENV.OBJ_NAME || uuidv4(); + const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; + + const { payload, hash } = generator.genPayload(registry_enabled); + const resp = s3_client.put(bucket, key, payload); + if (!resp.success) { + console.log(resp.error); + return; + } + + if (registry_enabled) { + registry.addObject("", "", bucket, key, hash); + } } export function obj_read() { - let random_read_obj = obj_list[Math.floor(Math.random()*obj_list.length)]; - - let resp = s3_cli.get(random_read_obj.bucket, random_read_obj.object) - if (!resp.success) { - console.log(resp.error); - } - if (__ENV.SLEEP) { sleep(__ENV.SLEEP); } + const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; + + const resp = s3_client.get(obj.bucket, obj.object); + if (!resp.success) { + console.log(resp.error); + } } export function uuidv4() { return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { - let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); - return v.toString(16); + let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); }); - } +} diff --git a/scenarios/verify.js b/scenarios/verify.js new file mode 100644 index 0000000..d981187 --- /dev/null +++ b/scenarios/verify.js @@ -0,0 +1,79 @@ +import native from 'k6/x/neofs/native'; +import registry from 'k6/x/neofs/registry'; +import s3 from 'k6/x/neofs/s3'; +import { sleep } from 'k6'; +import exec from 'k6/execution'; + +/* + ./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084 scenarios/verify.js +*/ + +// Time limit (in seconds) for the run +const time_limit = __ENV.TIME_LIMIT || "60"; + +// Connect to random gRPC endpoint +let grpc_client = undefined; +if (__ENV.GRPC_ENDPOINTS) { + const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(','); + const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)]; + grpc_client = native.connect(grpcEndpoint, ''); +} + +// Connect to random S3 endpoint +let s3_client = undefined; +if (__ENV.S3_ENDPOINTS) { + const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); + const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; + s3_client = s3.connect(`http://${s3_endpoint}`); +} + +const scenarios = { + verify: { + executor: 'constant-vus', + vus: __ENV.CLIENTS, + duration: `${time_limit}s`, + exec: 'obj_verify', + gracefulStop: '5s', + } +}; + +export const options = { + scenarios: scenarios, + setupTimeout: '5s', +}; + +export function obj_verify() { + if (__ENV.SLEEP) { + sleep(__ENV.SLEEP); + } + + const obj = registry.nextObjectToVerify(); + if (!obj) { + // TODO: consider using a metric with abort condition to stop execution when + // all VUs have no objects to verify. Alternative solution could be a + // shared-iterations executor, but it might be not a good choice, as we need to + // check same object several times (if specific request fails) + + // Allow time for other VUs to complete verification + sleep(30.0); + exec.test.abort("All objects have been verified"); + } + console.log(`Verifying object ${obj.id}`); + + let result = undefined; + if (obj.c_id && obj.o_id) { + result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash); + } else if (obj.s3_bucket && obj.s3_key) { + result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash); + } else { + console.log(`Object id=${obj.id} cannot be verified with supported protocols`); + registry.setObjectStatus(obj.id, "skipped"); + } + + if (result.success) { + registry.setObjectStatus(obj.id, "verified"); + } else { + registry.setObjectStatus(obj.id, "invalid"); + console.log(`Verify error on ${obj.c_id}/${obj.o_id}: {resp.error}`); + } +}