[#19] Implement configurable database name for registry
Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
parent
1cf53545f2
commit
2d4e619992
7 changed files with 187 additions and 131 deletions
123
internal/registry/obj_registry.go
Normal file
123
internal/registry/obj_registry.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type ObjRegistry struct {
|
||||
boltDB *bbolt.DB
|
||||
objSelector *ObjSelector
|
||||
}
|
||||
|
||||
const (
|
||||
// Indicates that an object was created, but its data wasn't verified yet
|
||||
statusCreated = "created"
|
||||
)
|
||||
|
||||
const bucketName = "_object"
|
||||
|
||||
// ObjectInfo represents information about neoFS object that has been created
|
||||
// via gRPC/HTTP/S3 API.
|
||||
type ObjectInfo struct {
|
||||
Id uint64 // Identifier in bolt DB
|
||||
CID string // Container ID in gRPC/HTTP
|
||||
OID string // Object ID in gRPC/HTTP
|
||||
S3Bucket string // Bucket name in S3
|
||||
S3Key string // Object key in S3
|
||||
Status string // Status of the object
|
||||
PayloadHash string // SHA256 hash of object payload that can be used for verification
|
||||
}
|
||||
|
||||
// NewModuleInstance implements the modules.Module interface and returns
|
||||
// a new instance for each VU.
|
||||
func NewObjRegistry(dbFilePath string) *ObjRegistry {
|
||||
options := bbolt.Options{Timeout: 100 * time.Millisecond}
|
||||
boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated}
|
||||
|
||||
objRepository := &ObjRegistry{boltDB: boltDB, objSelector: &objSelector}
|
||||
return objRepository
|
||||
}
|
||||
|
||||
func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
|
||||
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := b.NextSequence()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
object := ObjectInfo{
|
||||
Id: id,
|
||||
CID: cid,
|
||||
OID: oid,
|
||||
S3Bucket: s3Bucket,
|
||||
S3Key: s3Key,
|
||||
PayloadHash: payloadHash,
|
||||
Status: statusCreated,
|
||||
}
|
||||
objectJson, err := json.Marshal(object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put(encodeId(id), objectJson)
|
||||
})
|
||||
}
|
||||
|
||||
func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error {
|
||||
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objBytes := b.Get(encodeId(id))
|
||||
if objBytes == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
obj := new(ObjectInfo)
|
||||
if err := json.Unmarshal(objBytes, &obj); err != nil {
|
||||
return err
|
||||
}
|
||||
obj.Status = newStatus
|
||||
|
||||
objBytes, err = json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put(encodeId(id), objBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (o *ObjRegistry) NextObjectToVerify() (*ObjectInfo, error) {
|
||||
return o.objSelector.NextObject()
|
||||
}
|
||||
|
||||
func (o *ObjRegistry) Close() error {
|
||||
return o.boltDB.Close()
|
||||
}
|
||||
|
||||
func encodeId(id uint64) []byte {
|
||||
idBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(idBytes, id)
|
||||
return idBytes
|
||||
}
|
||||
|
||||
func decodeId(idBytes []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(idBytes)
|
||||
}
|
|
@ -1,21 +1,19 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
"go.k6.io/k6/js/modules"
|
||||
)
|
||||
|
||||
// RootModule is the global module object type. It is instantiated once per test
|
||||
// run and will be used to create k6/x/neofs/registry module instances for each VU.
|
||||
type RootModule struct {
|
||||
boltDB *bbolt.DB
|
||||
objSelector *ObjSelector
|
||||
// Stores object registry by path of database file. We should have only single instance
|
||||
// of registry per each file
|
||||
registries map[string]*ObjRegistry
|
||||
// Mutex to sync access to repositories map
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Registry represents an instance of the module for every VU.
|
||||
|
@ -24,25 +22,6 @@ type Registry struct {
|
|||
root *RootModule
|
||||
}
|
||||
|
||||
const (
|
||||
// Indicates that an object was created, but it's data wasn't verified yet
|
||||
statusCreated = "created"
|
||||
)
|
||||
|
||||
const bucketName = "_object"
|
||||
|
||||
// ObjectInfo represents information about neoFS object that has been created
|
||||
// via gRPC/HTTP/S3 API.
|
||||
type ObjectInfo struct {
|
||||
Id uint64 // Identifier in bolt DB
|
||||
CID string // Container ID in gRPC/HTTP
|
||||
OID string // Object ID in gRPC/HTTP
|
||||
S3Bucket string // Bucket name in S3
|
||||
S3Key string // Object key in S3
|
||||
Status string // Status of the object
|
||||
PayloadHash string // SHA256 hash of object payload that can be used for verification
|
||||
}
|
||||
|
||||
// Ensure the interfaces are implemented correctly.
|
||||
var (
|
||||
_ modules.Instance = &Registry{}
|
||||
|
@ -50,19 +29,7 @@ var (
|
|||
)
|
||||
|
||||
func init() {
|
||||
// TODO: research on a way to use configurable database name
|
||||
// TODO: research on a way to close DB properly (maybe in teardown)
|
||||
options := bbolt.Options{Timeout: 100 * time.Millisecond}
|
||||
boltDB, err := bbolt.Open("registry.bolt", os.ModePerm, &options)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Selector that searches objects for verification
|
||||
objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated}
|
||||
|
||||
rootModule := &RootModule{boltDB: boltDB, objSelector: &objSelector}
|
||||
rootModule := &RootModule{registries: make(map[string]*ObjRegistry)}
|
||||
modules.Register("k6/x/neofs/registry", rootModule)
|
||||
}
|
||||
|
||||
|
@ -79,72 +46,18 @@ func (r *Registry) Exports() modules.Exports {
|
|||
return modules.Exports{Default: r}
|
||||
}
|
||||
|
||||
func (r *Registry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
|
||||
return r.root.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Open creates a new instance of object registry that will store information about objects
|
||||
// in the specified file. If repository instance for the file was previously created, then
|
||||
// Open will return the existing instance of repository, because bolt database allows only
|
||||
// one write connection at a time
|
||||
func (r *Registry) Open(dbFilePath string) *ObjRegistry {
|
||||
r.root.mu.Lock()
|
||||
defer r.root.mu.Unlock()
|
||||
|
||||
id, err := b.NextSequence()
|
||||
if err != nil {
|
||||
return err
|
||||
registry := r.root.registries[dbFilePath]
|
||||
if registry == nil {
|
||||
registry = NewObjRegistry(dbFilePath)
|
||||
r.root.registries[dbFilePath] = registry
|
||||
}
|
||||
|
||||
object := ObjectInfo{
|
||||
Id: id,
|
||||
CID: cid,
|
||||
OID: oid,
|
||||
S3Bucket: s3Bucket,
|
||||
S3Key: s3Key,
|
||||
PayloadHash: payloadHash,
|
||||
Status: statusCreated,
|
||||
}
|
||||
objectJson, err := json.Marshal(object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put(encodeId(id), objectJson)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Registry) SetObjectStatus(id uint64, newStatus string) error {
|
||||
return r.root.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objBytes := b.Get(encodeId(id))
|
||||
if objBytes == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
obj := new(ObjectInfo)
|
||||
if err := json.Unmarshal(objBytes, &obj); err != nil {
|
||||
return err
|
||||
}
|
||||
obj.Status = newStatus
|
||||
|
||||
objBytes, err = json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put(encodeId(id), objBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Registry) NextObjectToVerify() (*ObjectInfo, error) {
|
||||
return r.root.objSelector.NextObject()
|
||||
}
|
||||
|
||||
func encodeId(id uint64) []byte {
|
||||
idBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(idBytes, id)
|
||||
return idBytes
|
||||
}
|
||||
|
||||
func decodeId(idBytes []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(idBytes)
|
||||
return registry
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
|||
-e PREGEN_JSON=test.json \
|
||||
scenarios/grpc.js
|
||||
|
||||
REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
|
||||
REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification.
|
||||
*/
|
||||
|
||||
// Parse profile from env (format is write:duration)
|
||||
|
@ -37,7 +37,9 @@ const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
|||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '');
|
||||
|
||||
const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled"
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
|
||||
const scenarios = {};
|
||||
|
@ -68,6 +70,12 @@ export function setup() {
|
|||
console.log("Pregenerated total objects: " + obj_list.length);
|
||||
}
|
||||
|
||||
export function teardown(data) {
|
||||
if (obj_registry) {
|
||||
obj_registry.close();
|
||||
}
|
||||
}
|
||||
|
||||
export const options = {
|
||||
scenarios,
|
||||
setupTimeout: '5s',
|
||||
|
@ -84,14 +92,14 @@ export function obj_write() {
|
|||
const container = container_list[Math.floor(Math.random() * container_list.length)];
|
||||
|
||||
const { payload, hash } = generator.genPayload(registry_enabled);
|
||||
let resp = grpc_client.put(container, headers, payload);
|
||||
const resp = grpc_client.put(container, headers, payload);
|
||||
if (!resp.success) {
|
||||
console.log(resp.error);
|
||||
return;
|
||||
}
|
||||
|
||||
if (registry_enabled) {
|
||||
registry.addObject(container, resp.object_id, "", "", hash);
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject(container, resp.object_id, "", "", hash);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,8 +108,8 @@ export function obj_read() {
|
|||
sleep(__ENV.SLEEP);
|
||||
}
|
||||
|
||||
let obj = obj_list[Math.floor(Math.random() * obj_list.length)];
|
||||
let resp = grpc_client.get(obj.container, obj.object)
|
||||
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
|
||||
const resp = grpc_client.get(obj.container, obj.object)
|
||||
if (!resp.success) {
|
||||
console.log(resp.error);
|
||||
}
|
||||
|
|
|
@ -95,4 +95,4 @@ export function uuidv4() {
|
|||
let r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
|
||||
return v.toString(16);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,11 +21,11 @@ $ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e GRPC_ENDPOI
|
|||
Options:
|
||||
* PROFILE - format write:duration
|
||||
* write - percent of VUs performing write operations (the rest will be read VUs)
|
||||
* duration - time in sec
|
||||
* duration - time in seconds
|
||||
* CLIENTS - number of VUs for all operations
|
||||
* WRITE_OBJ_SIZE - object size in kb for write(PUT) operations
|
||||
* PREGEN_JSON - path to json file with pre-generated containers and objects
|
||||
* REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
|
||||
* REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to value of REGISTRY_FILE.
|
||||
|
||||
## S3
|
||||
|
||||
|
@ -64,12 +64,12 @@ Options are identical to gRPC scenario, plus:
|
|||
|
||||
## Verify
|
||||
|
||||
This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY=enabled`.
|
||||
This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY_FILE`.
|
||||
|
||||
To verify stored objects execute scenario with options:
|
||||
|
||||
```
|
||||
./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 scenarios/verify.js
|
||||
./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 -e REGISTRY_FILE=registry.bolt scenarios/verify.js
|
||||
```
|
||||
|
||||
Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`.
|
||||
|
@ -78,3 +78,4 @@ Scenario ends as soon as all objects are checked (return code will be [108](http
|
|||
Options:
|
||||
* CLIENTS - number of VUs for verifying objects (VU can handle both GRPC and S3 objects)
|
||||
* TIME_LIMIT - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state.
|
||||
* REGISTRY_FILE - database file from which objects for verification should be read.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import datagen from 'k6/x/neofs/datagen';
|
||||
import s3 from 'k6/x/neofs/s3';
|
||||
import registry from 'k6/x/neofs/registry';
|
||||
import s3 from 'k6/x/neofs/s3';
|
||||
import { SharedArray } from 'k6/data';
|
||||
import { sleep } from 'k6';
|
||||
|
||||
|
@ -20,7 +20,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
|||
scenarios/s3.js
|
||||
|
||||
OBJ_NAME - if specified, this name will be used for all write operations instead of random generation.
|
||||
REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
|
||||
REGISTRY_FILE - if set, all produced objects will be stored in database for subsequent verification.
|
||||
*/
|
||||
|
||||
// Parse profile from env
|
||||
|
@ -35,7 +35,9 @@ const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
|
|||
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
|
||||
const s3_client = s3.connect(`http://${s3_endpoint}`);
|
||||
|
||||
const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled"
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||
|
||||
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
|
||||
|
||||
const scenarios = {};
|
||||
|
@ -66,6 +68,12 @@ export function setup() {
|
|||
console.log("Pregenerated total objects: " + obj_list.length);
|
||||
}
|
||||
|
||||
export function teardown(data) {
|
||||
if (obj_registry) {
|
||||
obj_registry.close();
|
||||
}
|
||||
}
|
||||
|
||||
export const options = {
|
||||
scenarios,
|
||||
setupTimeout: '5s',
|
||||
|
@ -86,8 +94,8 @@ export function obj_write() {
|
|||
return;
|
||||
}
|
||||
|
||||
if (registry_enabled) {
|
||||
registry.addObject("", "", bucket, key, hash);
|
||||
if (obj_registry) {
|
||||
obj_registry.addObject("", "", bucket, key, hash);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,9 +5,12 @@ import { sleep } from 'k6';
|
|||
import exec from 'k6/execution';
|
||||
|
||||
/*
|
||||
./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084 scenarios/verify.js
|
||||
./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084
|
||||
-e REGISTRY_FILE=registry.bolt scenarios/verify.js
|
||||
*/
|
||||
|
||||
const obj_registry = registry.open(__ENV.REGISTRY_FILE);
|
||||
|
||||
// Time limit (in seconds) for the run
|
||||
const time_limit = __ENV.TIME_LIMIT || "60";
|
||||
|
||||
|
@ -47,7 +50,7 @@ export function obj_verify() {
|
|||
sleep(__ENV.SLEEP);
|
||||
}
|
||||
|
||||
const obj = registry.nextObjectToVerify();
|
||||
const obj = obj_registry.nextObjectToVerify();
|
||||
if (!obj) {
|
||||
// TODO: consider using a metric with abort condition to stop execution when
|
||||
// all VUs have no objects to verify. Alternative solution could be a
|
||||
|
@ -67,13 +70,13 @@ export function obj_verify() {
|
|||
result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
|
||||
} else {
|
||||
console.log(`Object id=${obj.id} cannot be verified with supported protocols`);
|
||||
registry.setObjectStatus(obj.id, "skipped");
|
||||
obj_registry.setObjectStatus(obj.id, "skipped");
|
||||
}
|
||||
|
||||
if (result.success) {
|
||||
registry.setObjectStatus(obj.id, "verified");
|
||||
obj_registry.setObjectStatus(obj.id, "verified");
|
||||
} else {
|
||||
registry.setObjectStatus(obj.id, "invalid");
|
||||
obj_registry.setObjectStatus(obj.id, "invalid");
|
||||
console.log(`Verify error on ${obj.c_id}/${obj.o_id}: {resp.error}`);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue