From 2d4e6199928e2cf30df822ca11916310b08c3fc9 Mon Sep 17 00:00:00 2001 From: Vladimir Domnich Date: Wed, 21 Sep 2022 09:45:23 +0400 Subject: [PATCH] [#19] Implement configurable database name for registry Signed-off-by: Vladimir Domnich --- internal/registry/obj_registry.go | 123 +++++++++++++++++++++++++++++ internal/registry/registry.go | 127 +++++------------------------- scenarios/grpc.js | 22 ++++-- scenarios/http.js | 6 +- scenarios/run_scenarios.md | 9 ++- scenarios/s3.js | 18 +++-- scenarios/verify.js | 13 +-- 7 files changed, 187 insertions(+), 131 deletions(-) create mode 100644 internal/registry/obj_registry.go diff --git a/internal/registry/obj_registry.go b/internal/registry/obj_registry.go new file mode 100644 index 0000000..1007dfc --- /dev/null +++ b/internal/registry/obj_registry.go @@ -0,0 +1,123 @@ +package registry + +import ( + "encoding/binary" + "encoding/json" + "os" + "time" + + "go.etcd.io/bbolt" +) + +type ObjRegistry struct { + boltDB *bbolt.DB + objSelector *ObjSelector +} + +const ( + // Indicates that an object was created, but its data wasn't verified yet + statusCreated = "created" +) + +const bucketName = "_object" + +// ObjectInfo represents information about neoFS object that has been created +// via gRPC/HTTP/S3 API. +type ObjectInfo struct { + Id uint64 // Identifier in bolt DB + CID string // Container ID in gRPC/HTTP + OID string // Object ID in gRPC/HTTP + S3Bucket string // Bucket name in S3 + S3Key string // Object key in S3 + Status string // Status of the object + PayloadHash string // SHA256 hash of object payload that can be used for verification +} + +// NewModuleInstance implements the modules.Module interface and returns +// a new instance for each VU. +func NewObjRegistry(dbFilePath string) *ObjRegistry { + options := bbolt.Options{Timeout: 100 * time.Millisecond} + boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options) + if err != nil { + panic(err) + } + + objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated} + + objRepository := &ObjRegistry{boltDB: boltDB, objSelector: &objSelector} + return objRepository +} + +func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error { + return o.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + + id, err := b.NextSequence() + if err != nil { + return err + } + + object := ObjectInfo{ + Id: id, + CID: cid, + OID: oid, + S3Bucket: s3Bucket, + S3Key: s3Key, + PayloadHash: payloadHash, + Status: statusCreated, + } + objectJson, err := json.Marshal(object) + if err != nil { + return err + } + + return b.Put(encodeId(id), objectJson) + }) +} + +func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error { + return o.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + + objBytes := b.Get(encodeId(id)) + if objBytes == nil { + return nil + } + + obj := new(ObjectInfo) + if err := json.Unmarshal(objBytes, &obj); err != nil { + return err + } + obj.Status = newStatus + + objBytes, err = json.Marshal(obj) + if err != nil { + return err + } + return b.Put(encodeId(id), objBytes) + }) +} + +func (o *ObjRegistry) NextObjectToVerify() (*ObjectInfo, error) { + return o.objSelector.NextObject() +} + +func (o *ObjRegistry) Close() error { + return o.boltDB.Close() +} + +func encodeId(id uint64) []byte { + idBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idBytes, id) + return idBytes +} + +func decodeId(idBytes []byte) uint64 { + return binary.BigEndian.Uint64(idBytes) +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go index d6a7ffd..67359ee 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -1,21 +1,19 @@ package registry import ( - "encoding/binary" - "encoding/json" - "fmt" - "os" - "time" + "sync" - "go.etcd.io/bbolt" "go.k6.io/k6/js/modules" ) // RootModule is the global module object type. It is instantiated once per test // run and will be used to create k6/x/neofs/registry module instances for each VU. type RootModule struct { - boltDB *bbolt.DB - objSelector *ObjSelector + // Stores object registry by path of database file. We should have only single instance + // of registry per each file + registries map[string]*ObjRegistry + // Mutex to sync access to repositories map + mu sync.Mutex } // Registry represents an instance of the module for every VU. @@ -24,25 +22,6 @@ type Registry struct { root *RootModule } -const ( - // Indicates that an object was created, but it's data wasn't verified yet - statusCreated = "created" -) - -const bucketName = "_object" - -// ObjectInfo represents information about neoFS object that has been created -// via gRPC/HTTP/S3 API. -type ObjectInfo struct { - Id uint64 // Identifier in bolt DB - CID string // Container ID in gRPC/HTTP - OID string // Object ID in gRPC/HTTP - S3Bucket string // Bucket name in S3 - S3Key string // Object key in S3 - Status string // Status of the object - PayloadHash string // SHA256 hash of object payload that can be used for verification -} - // Ensure the interfaces are implemented correctly. var ( _ modules.Instance = &Registry{} @@ -50,19 +29,7 @@ var ( ) func init() { - // TODO: research on a way to use configurable database name - // TODO: research on a way to close DB properly (maybe in teardown) - options := bbolt.Options{Timeout: 100 * time.Millisecond} - boltDB, err := bbolt.Open("registry.bolt", os.ModePerm, &options) - if err != nil { - fmt.Println(err) - return - } - - // Selector that searches objects for verification - objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated} - - rootModule := &RootModule{boltDB: boltDB, objSelector: &objSelector} + rootModule := &RootModule{registries: make(map[string]*ObjRegistry)} modules.Register("k6/x/neofs/registry", rootModule) } @@ -79,72 +46,18 @@ func (r *Registry) Exports() modules.Exports { return modules.Exports{Default: r} } -func (r *Registry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error { - return r.root.boltDB.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) - if err != nil { - return err - } +// Open creates a new instance of object registry that will store information about objects +// in the specified file. If repository instance for the file was previously created, then +// Open will return the existing instance of repository, because bolt database allows only +// one write connection at a time +func (r *Registry) Open(dbFilePath string) *ObjRegistry { + r.root.mu.Lock() + defer r.root.mu.Unlock() - id, err := b.NextSequence() - if err != nil { - return err - } - - object := ObjectInfo{ - Id: id, - CID: cid, - OID: oid, - S3Bucket: s3Bucket, - S3Key: s3Key, - PayloadHash: payloadHash, - Status: statusCreated, - } - objectJson, err := json.Marshal(object) - if err != nil { - return err - } - - return b.Put(encodeId(id), objectJson) - }) -} - -func (r *Registry) SetObjectStatus(id uint64, newStatus string) error { - return r.root.boltDB.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) - if err != nil { - return err - } - - objBytes := b.Get(encodeId(id)) - if objBytes == nil { - return nil - } - - obj := new(ObjectInfo) - if err := json.Unmarshal(objBytes, &obj); err != nil { - return err - } - obj.Status = newStatus - - objBytes, err = json.Marshal(obj) - if err != nil { - return err - } - return b.Put(encodeId(id), objBytes) - }) -} - -func (r *Registry) NextObjectToVerify() (*ObjectInfo, error) { - return r.root.objSelector.NextObject() -} - -func encodeId(id uint64) []byte { - idBytes := make([]byte, 8) - binary.BigEndian.PutUint64(idBytes, id) - return idBytes -} - -func decodeId(idBytes []byte) uint64 { - return binary.BigEndian.Uint64(idBytes) + registry := r.root.registries[dbFilePath] + if registry == nil { + registry = NewObjRegistry(dbFilePath) + r.root.registries[dbFilePath] = registry + } + return registry } diff --git a/scenarios/grpc.js b/scenarios/grpc.js index be076b1..7ae5c52 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -20,7 +20,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; -e PREGEN_JSON=test.json \ scenarios/grpc.js - REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. + REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification. */ // Parse profile from env (format is write:duration) @@ -37,7 +37,9 @@ const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_client = native.connect(grpc_endpoint, ''); -const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled" +const registry_enabled = !!__ENV.REGISTRY_FILE; +const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; + const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const scenarios = {}; @@ -68,6 +70,12 @@ export function setup() { console.log("Pregenerated total objects: " + obj_list.length); } +export function teardown(data) { + if (obj_registry) { + obj_registry.close(); + } +} + export const options = { scenarios, setupTimeout: '5s', @@ -84,14 +92,14 @@ export function obj_write() { const container = container_list[Math.floor(Math.random() * container_list.length)]; const { payload, hash } = generator.genPayload(registry_enabled); - let resp = grpc_client.put(container, headers, payload); + const resp = grpc_client.put(container, headers, payload); if (!resp.success) { console.log(resp.error); return; } - if (registry_enabled) { - registry.addObject(container, resp.object_id, "", "", hash); + if (obj_registry) { + obj_registry.addObject(container, resp.object_id, "", "", hash); } } @@ -100,8 +108,8 @@ export function obj_read() { sleep(__ENV.SLEEP); } - let obj = obj_list[Math.floor(Math.random() * obj_list.length)]; - let resp = grpc_client.get(obj.container, obj.object) + const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; + const resp = grpc_client.get(obj.container, obj.object) if (!resp.success) { console.log(resp.error); } diff --git a/scenarios/http.js b/scenarios/http.js index 42d2d47..8ef07fe 100644 --- a/scenarios/http.js +++ b/scenarios/http.js @@ -92,7 +92,7 @@ export function obj_read() { export function uuidv4() { return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { - let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); - return v.toString(16); + let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); }); - } +} diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index 3dcdd8d..c0bc537 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -21,11 +21,11 @@ $ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e GRPC_ENDPOI Options: * PROFILE - format write:duration * write - percent of VUs performing write operations (the rest will be read VUs) - * duration - time in sec + * duration - time in seconds * CLIENTS - number of VUs for all operations * WRITE_OBJ_SIZE - object size in kb for write(PUT) operations * PREGEN_JSON - path to json file with pre-generated containers and objects - * REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. + * REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to value of REGISTRY_FILE. ## S3 @@ -64,12 +64,12 @@ Options are identical to gRPC scenario, plus: ## Verify -This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY=enabled`. +This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY_FILE`. To verify stored objects execute scenario with options: ``` -./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 scenarios/verify.js +./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 -e REGISTRY_FILE=registry.bolt scenarios/verify.js ``` Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`. @@ -78,3 +78,4 @@ Scenario ends as soon as all objects are checked (return code will be [108](http Options: * CLIENTS - number of VUs for verifying objects (VU can handle both GRPC and S3 objects) * TIME_LIMIT - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state. + * REGISTRY_FILE - database file from which objects for verification should be read. diff --git a/scenarios/s3.js b/scenarios/s3.js index 6370cd0..b626209 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -1,6 +1,6 @@ import datagen from 'k6/x/neofs/datagen'; -import s3 from 'k6/x/neofs/s3'; import registry from 'k6/x/neofs/registry'; +import s3 from 'k6/x/neofs/s3'; import { SharedArray } from 'k6/data'; import { sleep } from 'k6'; @@ -20,7 +20,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; scenarios/s3.js OBJ_NAME - if specified, this name will be used for all write operations instead of random generation. - REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification. + REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification. */ // Parse profile from env @@ -35,7 +35,9 @@ const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_client = s3.connect(`http://${s3_endpoint}`); -const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled" +const registry_enabled = !!__ENV.REGISTRY_FILE; +const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; + const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const scenarios = {}; @@ -66,6 +68,12 @@ export function setup() { console.log("Pregenerated total objects: " + obj_list.length); } +export function teardown(data) { + if (obj_registry) { + obj_registry.close(); + } +} + export const options = { scenarios, setupTimeout: '5s', @@ -86,8 +94,8 @@ export function obj_write() { return; } - if (registry_enabled) { - registry.addObject("", "", bucket, key, hash); + if (obj_registry) { + obj_registry.addObject("", "", bucket, key, hash); } } diff --git a/scenarios/verify.js b/scenarios/verify.js index d981187..2dee6ee 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -5,9 +5,12 @@ import { sleep } from 'k6'; import exec from 'k6/execution'; /* - ./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084 scenarios/verify.js + ./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084 + -e REGISTRY_FILE=registry.bolt scenarios/verify.js */ +const obj_registry = registry.open(__ENV.REGISTRY_FILE); + // Time limit (in seconds) for the run const time_limit = __ENV.TIME_LIMIT || "60"; @@ -47,7 +50,7 @@ export function obj_verify() { sleep(__ENV.SLEEP); } - const obj = registry.nextObjectToVerify(); + const obj = obj_registry.nextObjectToVerify(); if (!obj) { // TODO: consider using a metric with abort condition to stop execution when // all VUs have no objects to verify. Alternative solution could be a @@ -67,13 +70,13 @@ export function obj_verify() { result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash); } else { console.log(`Object id=${obj.id} cannot be verified with supported protocols`); - registry.setObjectStatus(obj.id, "skipped"); + obj_registry.setObjectStatus(obj.id, "skipped"); } if (result.success) { - registry.setObjectStatus(obj.id, "verified"); + obj_registry.setObjectStatus(obj.id, "verified"); } else { - registry.setObjectStatus(obj.id, "invalid"); + obj_registry.setObjectStatus(obj.id, "invalid"); console.log(`Verify error on ${obj.c_id}/${obj.o_id}: {resp.error}`); } }