[#19] Implement objects registry module

Registry module stores information about uploaded objects in bolt database and
allows to verify their validity after a load test.
Also, implemented logic to verify objects uploaded via gRPC and S3 protocols.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
Vladimir Domnich 2022-09-02 23:23:33 +04:00 committed by Alex Vanin
parent 9a78d83f01
commit 1cf53545f2
15 changed files with 617 additions and 188 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
k6 k6
*.bolt

View file

@ -64,7 +64,7 @@ const neofs_cli = native.connect("s01.neofs.devenv:8080", "")
flag, and `error` string. flag, and `error` string.
- `onsite(container_id, payload)`. Returns NeoFS object instance with prepared - `onsite(container_id, payload)`. Returns NeoFS object instance with prepared
headers. Invoke `put(headers)` method on this object to upload it into NeoFS. headers. Invoke `put(headers)` method on this object to upload it into NeoFS.
It returns dicrionary with `success` boolean flag, `object_id` string and It returns dictionary with `success` boolean flag, `object_id` string and
`error` string. `error` string.
## S3 ## S3

1
go.mod
View file

@ -54,6 +54,7 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.1.2 // indirect github.com/spf13/afero v1.1.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect

1
go.sum
View file

@ -358,6 +358,7 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/yuin/gopher-lua v0.0.0-20191128022950-c6266f4fe8d7/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/yuin/gopher-lua v0.0.0-20191128022950-c6266f4fe8d7/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.k6.io/k6 v0.38.2 h1:v4Dr7KhZVf+s6V6oz/pGtQ9ejTfNgUPcd/D3RH3GVdY= go.k6.io/k6 v0.38.2 h1:v4Dr7KhZVf+s6V6oz/pGtQ9ejTfNgUPcd/D3RH3GVdY=
go.k6.io/k6 v0.38.2/go.mod h1:1bTdDsXTT2V3in3ZgdR15MDW6SQQh5nWni59tirqNB8= go.k6.io/k6 v0.38.2/go.mod h1:1bTdDsXTT2V3in3ZgdR15MDW6SQQh5nWni59tirqNB8=

View file

@ -6,6 +6,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@ -52,6 +53,11 @@ type (
Error string Error string
} }
VerifyHashResponse struct {
Success bool
Error string
}
PutContainerResponse struct { PutContainerResponse struct {
Success bool Success bool
ContainerID string ContainerID string
@ -82,20 +88,16 @@ func (c *Client) SetBufferSize(size int) {
} }
} }
func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
var containerID cid.ID cliContainerID := parseContainerID(containerID)
err := containerID.DecodeString(inputContainerID)
if err != nil {
panic(err)
}
var addr address.Address var addr address.Address
addr.SetContainerID(containerID) addr.SetContainerID(cliContainerID)
tok := c.tok tok := c.tok
tok.ForVerb(session.VerbObjectPut) tok.ForVerb(session.VerbObjectPut)
tok.ApplyTo(addr) tok.ApplyTo(addr)
err = tok.Sign(c.key) err := tok.Sign(c.key)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -112,7 +114,7 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload
} }
var o object.Object var o object.Object
o.SetContainerID(containerID) o.SetContainerID(cliContainerID)
o.SetOwnerID(&owner) o.SetOwnerID(&owner)
o.SetAttributes(attrs...) o.SetAttributes(attrs...)
@ -127,32 +129,18 @@ func (c *Client) Put(inputContainerID string, headers map[string]string, payload
return PutResponse{Success: true, ObjectID: id.String()} return PutResponse{Success: true, ObjectID: id.String()}
} }
func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { func (c *Client) Get(containerID, objectID string) GetResponse {
var ( cliContainerID := parseContainerID(containerID)
buf = make([]byte, c.bufsize) cliObjectID := parseObjectID(objectID)
sz int
)
var containerID cid.ID
err := containerID.DecodeString(inputContainerID)
if err != nil {
panic(err)
}
var objectID oid.ID
err = objectID.DecodeString(inputObjectID)
if err != nil {
panic(err)
}
var addr address.Address var addr address.Address
addr.SetContainerID(containerID) addr.SetContainerID(cliContainerID)
addr.SetObjectID(objectID) addr.SetObjectID(cliObjectID)
tok := c.tok tok := c.tok
tok.ForVerb(session.VerbObjectGet) tok.ForVerb(session.VerbObjectGet)
tok.ApplyTo(addr) tok.ApplyTo(addr)
err = tok.Sign(c.key) err := tok.Sign(c.key)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -160,44 +148,97 @@ func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse {
stats.Report(c.vu, objGetTotal, 1) stats.Report(c.vu, objGetTotal, 1)
start := time.Now() start := time.Now()
var prmObjectGetInit client.PrmObjectGet var prm client.PrmObjectGet
prmObjectGetInit.ByID(objectID) prm.ByID(cliObjectID)
prmObjectGetInit.FromContainer(containerID) prm.FromContainer(cliContainerID)
prmObjectGetInit.WithinSession(tok) prm.WithinSession(tok)
objectReader, err := c.cli.ObjectGetInit(c.vu.Context(), prmObjectGetInit) var objSize = 0
if err != nil { err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
stats.Report(c.vu, objGetFails, 1) objSize += len(data)
return GetResponse{Success: false, Error: err.Error()} })
}
var o object.Object
if !objectReader.ReadHeader(&o) {
stats.Report(c.vu, objGetFails, 1)
var errorStr string
if _, err = objectReader.Close(); err != nil {
errorStr = err.Error()
}
return GetResponse{Success: false, Error: errorStr}
}
n, _ := objectReader.Read(buf)
for n > 0 {
sz += n
n, _ = objectReader.Read(buf)
}
_, err = objectReader.Close()
if err != nil { if err != nil {
stats.Report(c.vu, objGetFails, 1) stats.Report(c.vu, objGetFails, 1)
return GetResponse{Success: false, Error: err.Error()} return GetResponse{Success: false, Error: err.Error()}
} }
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(sz)) stats.ReportDataReceived(c.vu, float64(objSize))
return GetResponse{Success: true} return GetResponse{Success: true}
} }
func get(
cli *client.Client,
prm client.PrmObjectGet,
ctx context.Context,
bufSize int,
onDataChunk func(chunk []byte),
) error {
var buf = make([]byte, bufSize)
objectReader, err := cli.ObjectGetInit(ctx, prm)
if err != nil {
return err
}
var o object.Object
if !objectReader.ReadHeader(&o) {
if _, err = objectReader.Close(); err != nil {
return err
}
return errors.New("can't read object header")
}
n, _ := objectReader.Read(buf)
for n > 0 {
onDataChunk(buf[:n])
n, _ = objectReader.Read(buf)
}
_, err = objectReader.Close()
if err != nil {
return err
}
return nil
}
func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHashResponse {
cliContainerID := parseContainerID(containerID)
cliObjectID := parseObjectID(objectID)
var addr address.Address
addr.SetContainerID(cliContainerID)
addr.SetObjectID(cliObjectID)
tok := c.tok
tok.ForVerb(session.VerbObjectGet)
tok.ApplyTo(addr)
err := tok.Sign(c.key)
if err != nil {
panic(err)
}
var prm client.PrmObjectGet
prm.ByID(cliObjectID)
prm.FromContainer(cliContainerID)
prm.WithinSession(tok)
hasher := sha256.New()
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
hasher.Write(data)
})
if err != nil {
return VerifyHashResponse{Success: false, Error: err.Error()}
}
actualHash := hex.EncodeToString(hasher.Sum(make([]byte, 0, sha256.Size)))
if actualHash != expectedHash {
return VerifyHashResponse{Success: true, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}
}
func (c *Client) putCnrErrorResponse(err error) PutContainerResponse { func (c *Client) putCnrErrorResponse(err error) PutContainerResponse {
stats.Report(c.vu, cnrPutFails, 1) stats.Report(c.vu, cnrPutFails, 1)
return PutContainerResponse{Success: false, Error: err.Error()} return PutContainerResponse{Success: false, Error: err.Error()}
@ -270,7 +311,7 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
return PutContainerResponse{Success: true, ContainerID: res.ID().EncodeToString()} return PutContainerResponse{Success: true, ContainerID: res.ID().EncodeToString()}
} }
func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) PreparedObject { func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject {
maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli)
if err != nil { if err != nil {
panic(err) panic(err)
@ -286,11 +327,7 @@ func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) Prepa
panic(msg) panic(msg)
} }
var containerID cid.ID cliContainerID := parseContainerID(containerID)
err = containerID.DecodeString(inputContainerID)
if err != nil {
panic(err)
}
var owner user.ID var owner user.ID
user.IDFromKey(&owner, c.key.PublicKey) user.IDFromKey(&owner, c.key.PublicKey)
@ -300,7 +337,7 @@ func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) Prepa
obj := object.New() obj := object.New()
obj.SetVersion(&apiVersion) obj.SetVersion(&apiVersion)
obj.SetType(object.TypeRegular) obj.SetType(object.TypeRegular)
obj.SetContainerID(containerID) obj.SetContainerID(cliContainerID)
obj.SetOwnerID(&owner) obj.SetOwnerID(&owner)
obj.SetPayloadSize(uint64(ln)) obj.SetPayloadSize(uint64(ln))
obj.SetCreationEpoch(epoch) obj.SetCreationEpoch(epoch)
@ -471,3 +508,21 @@ func waitFor(ctx context.Context, params *waitParams, condition func(context.Con
} }
} }
} }
func parseContainerID(strContainerID string) cid.ID {
var containerID cid.ID
err := containerID.DecodeString(strContainerID)
if err != nil {
panic(err)
}
return containerID
}
func parseObjectID(strObjectID string) oid.ID {
var cliObjectID oid.ID
err := cliObjectID.DecodeString(strObjectID)
if err != nil {
panic(err)
}
return cliObjectID
}

View file

@ -0,0 +1,72 @@
package registry
import (
"encoding/json"
"sync"
"go.etcd.io/bbolt"
)
type ObjSelector struct {
boltDB *bbolt.DB
mu sync.Mutex
lastId uint64
objStatus string
}
func (o *ObjSelector) NextObject() (*ObjectInfo, error) {
var foundObj *ObjectInfo
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil
}
c := b.Cursor()
// We use mutex so that multiple VUs won't attempt to modify lastId simultaneously
// TODO: consider singleton channel that will produce those ids on demand
o.mu.Lock()
defer o.mu.Unlock()
// Establish the start position for searching the next object:
// If we should go from the beginning (lastId=0), then we start from the first element
// Otherwise we start from the key right after the lastId
var keyBytes, objBytes []byte
if o.lastId == 0 {
keyBytes, objBytes = c.First()
} else {
c.Seek(encodeId(o.lastId))
keyBytes, objBytes = c.Next()
}
// Iterate over objects to find the next object in the target status
var obj ObjectInfo
for ; keyBytes != nil; keyBytes, objBytes = c.Next() {
if objBytes != nil {
if err := json.Unmarshal(objBytes, &obj); err != nil {
// Ignore malformed objects for now. Maybe it should be panic?
continue
}
// If we reached an object in the target status, stop iterating
if obj.Status == o.objStatus {
foundObj = &obj
break
}
}
}
// Update the last key
if keyBytes != nil {
o.lastId = decodeId(keyBytes)
} else {
// Loopback to beginning so that we can revisit objects which were taken for verification
// but their status wasn't changed
// TODO: stop looping back to beginning too quickly
o.lastId = 0
}
return nil
})
return foundObj, err
}

View file

@ -0,0 +1,150 @@
package registry
import (
"encoding/binary"
"encoding/json"
"fmt"
"os"
"time"
"go.etcd.io/bbolt"
"go.k6.io/k6/js/modules"
)
// RootModule is the global module object type. It is instantiated once per test
// run and will be used to create k6/x/neofs/registry module instances for each VU.
type RootModule struct {
boltDB *bbolt.DB
objSelector *ObjSelector
}
// Registry represents an instance of the module for every VU.
type Registry struct {
vu modules.VU
root *RootModule
}
const (
// Indicates that an object was created, but it's data wasn't verified yet
statusCreated = "created"
)
const bucketName = "_object"
// ObjectInfo represents information about neoFS object that has been created
// via gRPC/HTTP/S3 API.
type ObjectInfo struct {
Id uint64 // Identifier in bolt DB
CID string // Container ID in gRPC/HTTP
OID string // Object ID in gRPC/HTTP
S3Bucket string // Bucket name in S3
S3Key string // Object key in S3
Status string // Status of the object
PayloadHash string // SHA256 hash of object payload that can be used for verification
}
// Ensure the interfaces are implemented correctly.
var (
_ modules.Instance = &Registry{}
_ modules.Module = &RootModule{}
)
func init() {
// TODO: research on a way to use configurable database name
// TODO: research on a way to close DB properly (maybe in teardown)
options := bbolt.Options{Timeout: 100 * time.Millisecond}
boltDB, err := bbolt.Open("registry.bolt", os.ModePerm, &options)
if err != nil {
fmt.Println(err)
return
}
// Selector that searches objects for verification
objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated}
rootModule := &RootModule{boltDB: boltDB, objSelector: &objSelector}
modules.Register("k6/x/neofs/registry", rootModule)
}
// NewModuleInstance implements the modules.Module interface and returns
// a new instance for each VU.
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
mi := &Registry{vu: vu, root: r}
return mi
}
// Exports implements the modules.Instance interface and returns the exports
// of the JS module.
func (r *Registry) Exports() modules.Exports {
return modules.Exports{Default: r}
}
func (r *Registry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
return r.root.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
id, err := b.NextSequence()
if err != nil {
return err
}
object := ObjectInfo{
Id: id,
CID: cid,
OID: oid,
S3Bucket: s3Bucket,
S3Key: s3Key,
PayloadHash: payloadHash,
Status: statusCreated,
}
objectJson, err := json.Marshal(object)
if err != nil {
return err
}
return b.Put(encodeId(id), objectJson)
})
}
func (r *Registry) SetObjectStatus(id uint64, newStatus string) error {
return r.root.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
objBytes := b.Get(encodeId(id))
if objBytes == nil {
return nil
}
obj := new(ObjectInfo)
if err := json.Unmarshal(objBytes, &obj); err != nil {
return err
}
obj.Status = newStatus
objBytes, err = json.Marshal(obj)
if err != nil {
return err
}
return b.Put(encodeId(id), objBytes)
})
}
func (r *Registry) NextObjectToVerify() (*ObjectInfo, error) {
return r.root.objSelector.NextObject()
}
func encodeId(id uint64) []byte {
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
return idBytes
}
func decodeId(idBytes []byte) uint64 {
return binary.BigEndian.Uint64(idBytes)
}

View file

@ -3,6 +3,8 @@ package s3
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/sha256"
"encoding/hex"
"strconv" "strconv"
"time" "time"
@ -35,6 +37,11 @@ type (
Success bool Success bool
Error string Error string
} }
VerifyHashResponse struct {
Success bool
Error string
}
) )
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
@ -60,32 +67,65 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
} }
func (c *Client) Get(bucket, key string) GetResponse { func (c *Client) Get(bucket, key string) GetResponse {
var (
buf = make([]byte, 4*1024)
sz int
)
stats.Report(c.vu, objGetTotal, 1) stats.Report(c.vu, objGetTotal, 1)
start := time.Now() start := time.Now()
obj, err := c.cli.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(bucket), var objSize = 0
Key: aws.String(key), err := get(c.cli, bucket, key, func(chunk []byte) {
objSize += len(chunk)
}) })
if err != nil { if err != nil {
stats.Report(c.vu, objGetFails, 1) stats.Report(c.vu, objGetFails, 1)
return GetResponse{Success: false, Error: err.Error()} return GetResponse{Success: false, Error: err.Error()}
} }
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(objSize))
return GetResponse{Success: true}
}
func get(
c *s3.Client,
bucket string,
key string,
onDataChunk func(chunk []byte),
) error {
var buf = make([]byte, 4*1024)
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
for { for {
n, err := obj.Body.Read(buf) n, err := obj.Body.Read(buf)
if n > 0 { if n > 0 {
sz += n onDataChunk(buf[:n])
} }
if err != nil { if err != nil {
break break
} }
} }
stats.ReportDataReceived(c.vu, float64(sz)) return nil
return GetResponse{Success: true} }
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
hasher := sha256.New()
err := get(c.cli, bucket, key, func(data []byte) {
hasher.Write(data)
})
if err != nil {
return VerifyHashResponse{Success: false, Error: err.Error()}
}
actualHash := hex.EncodeToString(hasher.Sum(make([]byte, 0, sha256.Size)))
if actualHash != expectedHash {
return VerifyHashResponse{Success: true, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}
} }
func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse { func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse {

View file

@ -31,7 +31,6 @@ var (
func init() { func init() {
modules.Register("k6/x/neofs/s3", new(RootModule)) modules.Register("k6/x/neofs/s3", new(RootModule))
} }
// NewModuleInstance implements the modules.Module interface and returns // NewModuleInstance implements the modules.Module interface and returns

View file

@ -3,6 +3,7 @@ package xk6_neofs
import ( import (
_ "github.com/nspcc-dev/xk6-neofs/internal/datagen" _ "github.com/nspcc-dev/xk6-neofs/internal/datagen"
_ "github.com/nspcc-dev/xk6-neofs/internal/native" _ "github.com/nspcc-dev/xk6-neofs/internal/native"
_ "github.com/nspcc-dev/xk6-neofs/internal/registry"
_ "github.com/nspcc-dev/xk6-neofs/internal/s3" _ "github.com/nspcc-dev/xk6-neofs/internal/s3"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
) )

View file

@ -1,54 +1,61 @@
import datagen from 'k6/x/neofs/datagen'; import datagen from 'k6/x/neofs/datagen';
import native from 'k6/x/neofs/native'; import native from 'k6/x/neofs/native';
import registry from 'k6/x/neofs/registry';
import { SharedArray } from 'k6/data'; import { SharedArray } from 'k6/data';
import { sleep } from 'k6'; import { sleep } from 'k6';
const obj_list = new SharedArray('obj_list', function () { const obj_list = new SharedArray('obj_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).objects; }); return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
});
const container_list = new SharedArray('container_list', function () { const container_list = new SharedArray('container_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).containers; }); return JSON.parse(open(__ENV.PREGEN_JSON)).containers;
});
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
/* /*
./k6 run -e PROFILE=0:60 -e WRITE_OBJ_SIZE=1024 -e CLIENTS=200 -e NODES=node4.data:8084 -e PREGEN_JSON=test.json scenarios/grpc.js ./k6 run -e PROFILE=0:60 -e CLIENTS=200 -e WRITE_OBJ_SIZE=1024 \
-e GRPC_ENDPOINTS=host1:8080,host2:8080 \
-e PREGEN_JSON=test.json \
scenarios/grpc.js
Parse profile from env. REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
Format write:obj_size:
* write - write operations in percent, relative to read operations
* duration - duration in seconds
*/ */
// Parse profile from env (format is write:duration)
// * write - percent of VUs performing write operations (the rest will be read VUs)
// * duration - duration in seconds
const [ write, duration ] = __ENV.PROFILE.split(':'); const [ write, duration ] = __ENV.PROFILE.split(':');
// Set VUs between write and read operations // Allocate VUs between write and read operations
let vus_read = Math.ceil(__ENV.CLIENTS/100*(100-parseInt(write))) const read_vu_count = Math.ceil(__ENV.CLIENTS / 100 * (100 - parseInt(write)));
let vus_write = __ENV.CLIENTS - vus_read const write_vu_count = __ENV.CLIENTS - read_vu_count;
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '');
const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled"
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
let nodes = __ENV.NODES.split(',') const scenarios = {};
let rand_node = nodes[Math.floor(Math.random()*nodes.length)];
const neofs_cli = native.connect(rand_node, "") if (write_vu_count > 0) {
let scenarios = {}
if (vus_write > 0){
scenarios.write = { scenarios.write = {
executor: 'constant-vus', executor: 'constant-vus',
vus: vus_write, vus: write_vu_count,
duration: `${duration}s`, duration: `${duration}s`,
exec: 'obj_write', exec: 'obj_write',
gracefulStop: '5s', gracefulStop: '5s',
} }
} }
if (vus_read > 0){ if (read_vu_count > 0) {
scenarios.read = { scenarios.read = {
executor: 'constant-vus', executor: 'constant-vus',
vus: vus_read, vus: read_vu_count,
duration: `${duration}s`, duration: `${duration}s`,
exec: 'obj_read', exec: 'obj_read',
gracefulStop: '5s', gracefulStop: '5s',
@ -56,42 +63,48 @@ if (vus_read > 0){
} }
export function setup() { export function setup() {
console.log("Pregenerated containers: " + container_list.length) console.log("Pregenerated containers: " + container_list.length);
console.log("Pregenerated read object size: " + read_size) console.log("Pregenerated read object size: " + read_size);
console.log("Pregenerated total objects: " + obj_list.length) console.log("Pregenerated total objects: " + obj_list.length);
} }
export const options = { export const options = {
scenarios: scenarios, scenarios,
setupTimeout: '5s', setupTimeout: '5s',
}; };
export function obj_write() { export function obj_write() {
let headers = {
'unique_header': uuidv4()
}
let container = container_list[Math.floor(Math.random()*container_list.length)];
const { payload } = generator.genPayload(false);
let resp = neofs_cli.put( container, headers, payload);
if (!resp.success) {
console.log(resp.error);
}
if (__ENV.SLEEP) { if (__ENV.SLEEP) {
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }
const headers = {
unique_header: uuidv4()
};
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
let resp = grpc_client.put(container, headers, payload);
if (!resp.success) {
console.log(resp.error);
return;
}
if (registry_enabled) {
registry.addObject(container, resp.object_id, "", "", hash);
}
} }
export function obj_read() { export function obj_read() {
let random_read_obj = obj_list[Math.floor(Math.random()*obj_list.length)];
let resp = neofs_cli.get(random_read_obj.container, random_read_obj.object)
if (!resp.success) {
console.log(resp.error);
}
if (__ENV.SLEEP) { if (__ENV.SLEEP) {
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }
let obj = obj_list[Math.floor(Math.random() * obj_list.length)];
let resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) {
console.log(resp.error);
}
} }
export function uuidv4() { export function uuidv4() {

View file

@ -94,15 +94,15 @@ def execute_cmd(cmd_line):
def create_container(): def create_container():
cmd_line = f"neofs-cli --rpc-endpoint {args.endpoint} container create -g --policy 'REP 1 IN X CBF 1 SELECT 1 FROM * AS X' --basic-acl public-read-write --await" cmd_line = f"neofs-cli --rpc-endpoint {args.endpoint} container create -g --policy 'REP 1 IN X CBF 1 SELECT 1 FROM * AS X' --basic-acl public-read-write --await"
ouptut, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Container has not been created.") print(f" > Container has not been created.")
else: else:
try: try:
fst_str = ouptut.split('\n')[0] fst_str = output.split('\n')[0]
except Exception: except Exception:
print(f"Got empty output: {ouptut}") print(f"Got empty output: {output}")
splitted = fst_str.split(": ") splitted = fst_str.split(": ")
if len(splitted) != 2: if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{fst_str}") raise ValueError(f"no CID was parsed from command output: \t{fst_str}")

View file

@ -15,20 +15,21 @@ The tests will use all pre-created containers for PUT operations and all pre-cre
2. Execute scenario with options: 2. Execute scenario with options:
```shell ```shell
$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e NODES=node1.data:8080,node4.data:8080 -e PREGEN_JSON=./grpc.json scenarios/grpc.js $ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e GRPC_ENDPOINTS=node1.data:8080,node4.data:8080 -e PREGEN_JSON=./grpc.json scenarios/grpc.js
``` ```
Options: Options:
* PROFILE - format write:obj_size:duration * PROFILE - format write:duration
* write - write operations in percent, relative to read operations * write - percent of VUs performing write operations (the rest will be read VUs)
* duration - time in sec * duration - time in sec
* CLIENTS - number of VUs for all operations * CLIENTS - number of VUs for all operations
* WRITE_OBJ_SIZE - object size in kb for write(PUT) operations * WRITE_OBJ_SIZE - object size in kb for write(PUT) operations
* PREGEN_JSON - path to json file with pre-generated containers and objects * PREGEN_JSON - path to json file with pre-generated containers and objects
* REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
## S3 ## S3
1. Create s3 credential: 1. Create s3 credentials:
```shell ```shell
$ neofs-s3-authmate issue-secret --wallet wallet.json --peer node1.intra:8080 --gate-public-key 03d33a2cc7b8daaa5a3df3fccf065f7cf1fc6a3279efc161fcec512dcc0c1b2277 --gate-public-key 03ff0ad212e10683234442530bfd71d0bb18c3fbd6459aba768eacf158b0c359a2 --gate-public-key 033ae03ff30ed3b6665af69955562cfc0eae18d50e798ab31f054ee22e32fee993 --gate-public-key 02127c7498de0765d2461577c9d4f13f916eefd1884896183e6de0d9a85d17f2fb --bearer-rules rules.json --container-placement-policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" $ neofs-s3-authmate issue-secret --wallet wallet.json --peer node1.intra:8080 --gate-public-key 03d33a2cc7b8daaa5a3df3fccf065f7cf1fc6a3279efc161fcec512dcc0c1b2277 --gate-public-key 03ff0ad212e10683234442530bfd71d0bb18c3fbd6459aba768eacf158b0c359a2 --gate-public-key 033ae03ff30ed3b6665af69955562cfc0eae18d50e798ab31f054ee22e32fee993 --gate-public-key 02127c7498de0765d2461577c9d4f13f916eefd1884896183e6de0d9a85d17f2fb --bearer-rules rules.json --container-placement-policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
@ -55,6 +56,25 @@ The tests will use all pre-created buckets for PUT operations and all pre-create
3. Execute scenario with options: 3. Execute scenario with options:
```shell ```shell
$ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e NODES=node1.data:8084,node4.data:8084 -e PREGEN_JSON=s3.json scenarios/s3.js $ ./k6 run -e PROFILE=50:60 -e WRITE_OBJ_SIZE=8192 -e CLIENTS=400 -e S3_ENDPOINTS=node1.data:8084,node4.data:8084 -e PREGEN_JSON=s3.json scenarios/s3.js
```
Options are identical to gRPC scenario, plus:
* OBJ_NAME - if specified, this name will be used for all write operations instead of random generation.
## Verify
This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC and/or S3 scenario with option `REGISTRY=enabled`.
To verify stored objects execute scenario with options:
``` ```
./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=node1.data:8080,node2.data:8080 -e S3_ENDPOINTS=node1.data:8084,node2.data:8084 scenarios/verify.js
```
Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`.
Scenario ends as soon as all objects are checked (return code will be [108](https://k6.io/docs/javascript-api/k6-execution/#test)).
Options:
* CLIENTS - number of VUs for verifying objects (VU can handle both GRPC and S3 objects)
* TIME_LIMIT - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state.

View file

@ -1,110 +1,107 @@
import datagen from 'k6/x/neofs/datagen'; import datagen from 'k6/x/neofs/datagen';
import s3 from 'k6/x/neofs/s3'; import s3 from 'k6/x/neofs/s3';
import registry from 'k6/x/neofs/registry';
import { SharedArray } from 'k6/data'; import { SharedArray } from 'k6/data';
import { sleep } from 'k6'; import { sleep } from 'k6';
const obj_list = new SharedArray('obj_list', function () { const obj_list = new SharedArray('obj_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).objects; }); return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
});
const bucket_list = new SharedArray('bucket_list', function () { const bucket_list = new SharedArray('bucket_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).buckets; }); return JSON.parse(open(__ENV.PREGEN_JSON)).buckets;
});
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
/* /*
./k6 run -e PROFILE=0:60 -e WRITE_OBJ_SIZE=1024 -e CLIENTS=200 -e NODES=node4.data:8084 -e PREGEN_JSON=test.json scenarios/s3_t.js ./k6 run -e PROFILE=0:60 -e CLIENTS=200 -e WRITE_OBJ_SIZE=1024 \
-e S3_ENDPOINTS=host1:8084,host2:8084 -e PREGEN_JSON=test.json \
scenarios/s3.js
Parse profile from env. OBJ_NAME - if specified, this name will be used for all write operations instead of random generation.
Format write:obj_size: REGISTRY - if set to "enabled", all produced objects will be stored in database for subsequent verification.
* write - write operations in percent, relative to read operations
* duration - duration in seconds
OBJ_NAME - this name will be used for all write operations instead of randow generation in case of declared.
*/ */
// Parse profile from env
const [ write, duration ] = __ENV.PROFILE.split(':'); const [ write, duration ] = __ENV.PROFILE.split(':');
// Set VUs between write and read operations // Allocate VUs between write and read operations
let vus_read = Math.ceil(__ENV.CLIENTS/100*(100-parseInt(write))) let read_vu_count = Math.ceil(__ENV.CLIENTS / 100 * (100 - parseInt(write)));
let vus_write = __ENV.CLIENTS - vus_read let write_vu_count = __ENV.CLIENTS - read_vu_count;
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`);
const registry_enabled = (__ENV.REGISTRY || "").toLowerCase() == "enabled"
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
let nodes = __ENV.NODES.split(',') const scenarios = {};
let rand_node = nodes[Math.floor(Math.random()*nodes.length)];
let s3_cli = s3.connect(`http://${rand_node}`) if (write_vu_count > 0){
let scenarios = {}
if (vus_write > 0){
scenarios.write = { scenarios.write = {
executor: 'constant-vus', executor: 'constant-vus',
vus: vus_write, vus: write_vu_count,
duration: `${duration}s`, duration: `${duration}s`,
exec: 'obj_write', exec: 'obj_write',
gracefulStop: '5s', gracefulStop: '5s',
} };
} }
if (vus_read > 0){ if (read_vu_count > 0){
scenarios.read = { scenarios.read = {
executor: 'constant-vus', executor: 'constant-vus',
vus: vus_read, vus: read_vu_count,
duration: `${duration}s`, duration: `${duration}s`,
exec: 'obj_read', exec: 'obj_read',
gracefulStop: '5s', gracefulStop: '5s',
} };
} }
export function setup() { export function setup() {
console.log("Pregenerated buckets: " + bucket_list.length) console.log("Pregenerated buckets: " + bucket_list.length);
console.log("Pregenerated read object size: " + read_size) console.log("Pregenerated read object size: " + read_size);
console.log("Pregenerated total objects: " + obj_list.length) console.log("Pregenerated total objects: " + obj_list.length);
} }
export const options = { export const options = {
scenarios: scenarios, scenarios,
setupTimeout: '5s', setupTimeout: '5s',
}; };
export function obj_write() { export function obj_write() {
let key = "";
if (__ENV.OBJ_NAME){
key = __ENV.OBJ_NAME;
}
else{
key = uuidv4();
}
let bucket = bucket_list[Math.floor(Math.random()*bucket_list.length)];
const { payload } = generator.genPayload(false);
let resp = s3_cli.put(bucket, key, payload)
if (!resp.success) {
console.log(resp.error);
}
if (__ENV.SLEEP) { if (__ENV.SLEEP) {
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
console.log(resp.error);
return;
}
if (registry_enabled) {
registry.addObject("", "", bucket, key, hash);
}
} }
export function obj_read() { export function obj_read() {
let random_read_obj = obj_list[Math.floor(Math.random()*obj_list.length)];
let resp = s3_cli.get(random_read_obj.bucket, random_read_obj.object)
if (!resp.success) {
console.log(resp.error);
}
if (__ENV.SLEEP) { if (__ENV.SLEEP) {
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object);
if (!resp.success) {
console.log(resp.error);
}
} }
export function uuidv4() { export function uuidv4() {

79
scenarios/verify.js Normal file
View file

@ -0,0 +1,79 @@
import native from 'k6/x/neofs/native';
import registry from 'k6/x/neofs/registry';
import s3 from 'k6/x/neofs/s3';
import { sleep } from 'k6';
import exec from 'k6/execution';
/*
./k6 run -e CLIENTS=200 -e TIME_LIMIT=30 -e GRPC_ENDPOINTS=node4.data:8084 scenarios/verify.js
*/
// Time limit (in seconds) for the run
const time_limit = __ENV.TIME_LIMIT || "60";
// Connect to random gRPC endpoint
let grpc_client = undefined;
if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
grpc_client = native.connect(grpcEndpoint, '');
}
// Connect to random S3 endpoint
let s3_client = undefined;
if (__ENV.S3_ENDPOINTS) {
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
s3_client = s3.connect(`http://${s3_endpoint}`);
}
const scenarios = {
verify: {
executor: 'constant-vus',
vus: __ENV.CLIENTS,
duration: `${time_limit}s`,
exec: 'obj_verify',
gracefulStop: '5s',
}
};
export const options = {
scenarios: scenarios,
setupTimeout: '5s',
};
export function obj_verify() {
if (__ENV.SLEEP) {
sleep(__ENV.SLEEP);
}
const obj = registry.nextObjectToVerify();
if (!obj) {
// TODO: consider using a metric with abort condition to stop execution when
// all VUs have no objects to verify. Alternative solution could be a
// shared-iterations executor, but it might be not a good choice, as we need to
// check same object several times (if specific request fails)
// Allow time for other VUs to complete verification
sleep(30.0);
exec.test.abort("All objects have been verified");
}
console.log(`Verifying object ${obj.id}`);
let result = undefined;
if (obj.c_id && obj.o_id) {
result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash);
} else if (obj.s3_bucket && obj.s3_key) {
result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
} else {
console.log(`Object id=${obj.id} cannot be verified with supported protocols`);
registry.setObjectStatus(obj.id, "skipped");
}
if (result.success) {
registry.setObjectStatus(obj.id, "verified");
} else {
registry.setObjectStatus(obj.id, "invalid");
console.log(`Verify error on ${obj.c_id}/${obj.o_id}: {resp.error}`);
}
}