From 89faf927fb5ca8f3c5b2a8f3ff239b7e0ba42209 Mon Sep 17 00:00:00 2001 From: Vladimir Domnich Date: Mon, 26 Sep 2022 22:05:28 +0400 Subject: [PATCH] [#21] Improve iteration logic in obj selector 1. Implement reset method that allows to start iteration from beginning of the registry. This allows to revisit objects in scenarios like object deletion. 2. Add filter structure that allows to select objects based on age. Signed-off-by: Vladimir Domnich --- .github/workflows/go.yml | 1 + internal/registry/obj_registry.go | 58 ++++++---------------- internal/registry/obj_selector.go | 82 +++++++++++++++++++++++++++---- internal/registry/registry.go | 51 ++++++++++++++++++- scenarios/verify.js | 22 ++++++--- 5 files changed, 152 insertions(+), 62 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 1b1d5a9..bdeaea6 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -21,6 +21,7 @@ jobs: uses: golangci/golangci-lint-action@v2 with: version: latest + args: --timeout=2m tests: name: Tests diff --git a/internal/registry/obj_registry.go b/internal/registry/obj_registry.go index 0caed21..58543db 100644 --- a/internal/registry/obj_registry.go +++ b/internal/registry/obj_registry.go @@ -11,8 +11,7 @@ import ( ) type ObjRegistry struct { - boltDB *bbolt.DB - objSelector *ObjSelector + boltDB *bbolt.DB } const ( @@ -25,17 +24,20 @@ const bucketName = "_object" // ObjectInfo represents information about neoFS object that has been created // via gRPC/HTTP/S3 API. type ObjectInfo struct { - Id uint64 // Identifier in bolt DB - CID string // Container ID in gRPC/HTTP - OID string // Object ID in gRPC/HTTP - S3Bucket string // Bucket name in S3 - S3Key string // Object key in S3 - Status string // Status of the object - PayloadHash string // SHA256 hash of object payload that can be used for verification + Id uint64 // Identifier in bolt DB + CreatedAt time.Time // UTC date&time when the object was created + CID string // Container ID in gRPC/HTTP + OID string // Object ID in gRPC/HTTP + S3Bucket string // Bucket name in S3 + S3Key string // Object key in S3 + Status string // Status of the object + PayloadHash string // SHA256 hash of object payload that can be used for verification } -// NewModuleInstance implements the modules.Module interface and returns -// a new instance for each VU. +// NewObjRegistry creates a new instance of object registry that stores information +// about objects in the specified bolt database. As registry uses read-write +// connection to the database, there may be only one instance of object registry +// per database file at a time. func NewObjRegistry(dbFilePath string) *ObjRegistry { options := bbolt.Options{Timeout: 100 * time.Millisecond} boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options) @@ -43,9 +45,7 @@ func NewObjRegistry(dbFilePath string) *ObjRegistry { panic(err) } - objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated} - - objRepository := &ObjRegistry{boltDB: boltDB, objSelector: &objSelector} + objRepository := &ObjRegistry{boltDB: boltDB} return objRepository } @@ -63,6 +63,7 @@ func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) e object := ObjectInfo{ Id: id, + CreatedAt: time.Now().UTC(), CID: cid, OID: oid, S3Bucket: s3Bucket, @@ -105,35 +106,6 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error { }) } -func (o *ObjRegistry) GetObjectCountInStatus(status string) (int, error) { - var objCount = 0 - err := o.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(bucketName)) - if b == nil { - return nil - } - - return b.ForEach(func(_, objBytes []byte) error { - if objBytes != nil { - var obj ObjectInfo - if err := json.Unmarshal(objBytes, &obj); err != nil { - // Ignore malformed objects - return nil - } - if obj.Status == status { - objCount++ - } - } - return nil - }) - }) - return objCount, err -} - -func (o *ObjRegistry) NextObjectToVerify() (*ObjectInfo, error) { - return o.objSelector.NextObject() -} - func (o *ObjRegistry) Close() error { return o.boltDB.Close() } diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index 8efe5d6..f41a634 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -2,19 +2,34 @@ package registry import ( "encoding/json" - "errors" "sync" + "time" "go.etcd.io/bbolt" ) -type ObjSelector struct { - boltDB *bbolt.DB - mu sync.Mutex - lastId uint64 - objStatus string +type ObjFilter struct { + Status string + Age int } +type ObjSelector struct { + boltDB *bbolt.DB + filter *ObjFilter + mu sync.Mutex + lastId uint64 +} + +// NewObjSelector creates a new instance of object selector that can iterate over +// objects in the specified registry. +func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector { + objSelector := &ObjSelector{boltDB: registry.boltDB, filter: filter} + return objSelector +} + +// NextObject returns the next object from the registry that matches filter of +// the selector. NextObject only roams forward from the current position of the +// selector. If there are no objects that match the filter, then returns nil. func (o *ObjSelector) NextObject() (*ObjectInfo, error) { var foundObj *ObjectInfo err := o.boltDB.View(func(tx *bbolt.Tx) error { @@ -41,7 +56,7 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { keyBytes, objBytes = c.Next() } - // Iterate over objects to find the next object in the target status + // Iterate over objects to find the next object matching the filter var obj ObjectInfo for ; keyBytes != nil; keyBytes, objBytes = c.Next() { if objBytes != nil { @@ -49,8 +64,8 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { // Ignore malformed objects for now. Maybe it should be panic? continue } - // If we reached an object in the target status, stop iterating - if obj.Status == o.objStatus { + // If we reached an object that matches filter, stop iterating + if o.filter.match(obj) { foundObj = &obj break } @@ -63,7 +78,54 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { return nil } - return errors.New("no objects are available") + return nil }) return foundObj, err } + +// Resets object selector to start scanning objects from the beginning. +func (o *ObjSelector) Reset() { + o.mu.Lock() + defer o.mu.Unlock() + + o.lastId = 0 +} + +// Count returns total number of objects that match filter of the selector. +func (o *ObjSelector) Count() (int, error) { + var count = 0 + err := o.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + if b == nil { + return nil + } + + return b.ForEach(func(_, objBytes []byte) error { + if objBytes != nil { + var obj ObjectInfo + if err := json.Unmarshal(objBytes, &obj); err != nil { + // Ignore malformed objects + return nil + } + if o.filter.match(obj) { + count++ + } + } + return nil + }) + }) + return count, err +} + +func (f *ObjFilter) match(o ObjectInfo) bool { + if f.Status != "" && f.Status != o.Status { + return false + } + if f.Age != 0 { + objAge := time.Now().UTC().Sub(o.CreatedAt).Seconds() + if objAge < float64(f.Age) { + return false + } + } + return true +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go index f1cb8f9..27650f8 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -1,6 +1,9 @@ package registry import ( + "fmt" + "reflect" + "strconv" "sync" "go.k6.io/k6/js/modules" @@ -12,7 +15,9 @@ type RootModule struct { // Stores object registry by path of database file. We should have only single instance // of registry per each file registries map[string]*ObjRegistry - // Mutex to sync access to repositories map + // Stores object selector by name. We may have multiple selectors per database file + selectors map[string]*ObjSelector + // Mutex to sync access to the maps mu sync.Mutex } @@ -29,7 +34,10 @@ var ( ) func init() { - rootModule := &RootModule{registries: make(map[string]*ObjRegistry)} + rootModule := &RootModule{ + registries: make(map[string]*ObjRegistry), + selectors: make(map[string]*ObjSelector), + } modules.Register("k6/x/neofs/registry", rootModule) } @@ -53,7 +61,11 @@ func (r *Registry) Exports() modules.Exports { func (r *Registry) Open(dbFilePath string) *ObjRegistry { r.root.mu.Lock() defer r.root.mu.Unlock() + return r.open(dbFilePath) +} +// Implementation of Open without mutex lock, so that it can be re-used in other methods. +func (r *Registry) open(dbFilePath string) *ObjRegistry { registry := r.root.registries[dbFilePath] if registry == nil { registry = NewObjRegistry(dbFilePath) @@ -61,3 +73,38 @@ func (r *Registry) Open(dbFilePath string) *ObjRegistry { } return registry } + +func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string]string) *ObjSelector { + objFilter, err := parseFilter(filter) + if err != nil { + panic(err) + } + + r.root.mu.Lock() + defer r.root.mu.Unlock() + + selector := r.root.selectors[name] + if selector == nil { + registry := r.open(dbFilePath) + selector = NewObjSelector(registry, objFilter) + r.root.selectors[name] = selector + } else if !reflect.DeepEqual(selector.filter, objFilter) { + panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) + } + return selector +} + +func parseFilter(filter map[string]string) (*ObjFilter, error) { + objFilter := ObjFilter{} + objFilter.Status = filter["status"] + + if ageStr := filter["age"]; ageStr != "" { + age, err := strconv.ParseInt(ageStr, 10, 64) + if err != nil { + return nil, err + } + objFilter.Age = int(age) + } + + return &objFilter, nil +} diff --git a/scenarios/verify.js b/scenarios/verify.js index 6925dc5..75ce0c0 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -37,10 +37,17 @@ if (__ENV.S3_ENDPOINTS) { } // We will attempt to verify every object in "created" status. The scenario will execute -// as many scenarios as there are objects. Each object will have 3 retries to be verified -const obj_count_to_verify = obj_registry.getObjectCountInStatus("created"); -// Execute at least one iteration (shared-iterations can't run 0 iterations) -const iterations = Math.max(1, obj_count_to_verify); +// as many iterations as there are objects. Each object will have 3 retries to be verified +const obj_to_verify_selector = registry.getSelector( + __ENV.REGISTRY_FILE, + "obj_to_verify", + { + status: "created", + } +); +const obj_to_verify_count = obj_to_verify_selector.count(); +// Execute at least one iteration (executor shared-iterations can't run 0 iterations) +const iterations = Math.max(1, obj_to_verify_count); // Executor shared-iterations requires number of iterations to be larger than number of VUs const vus = Math.min(__ENV.CLIENTS, iterations); @@ -63,7 +70,8 @@ export const options = { export function setup() { // Populate counters with initial values for (const [status, counter] of Object.entries(obj_counters)) { - counter.add(obj_registry.getObjectCountInStatus(status)); + const obj_selector = registry.getSelector(__ENV.REGISTRY_FILE, status, { status }); + counter.add(obj_selector.count()); } } @@ -72,7 +80,7 @@ export function obj_verify() { sleep(__ENV.SLEEP); } - const obj = obj_registry.nextObjectToVerify(); + const obj = obj_to_verify_selector.nextObject(); if (!obj) { console.log("All objects have been verified"); return; @@ -103,7 +111,7 @@ function verify_object_with_retries(obj, attempts) { } // Unless we explicitly saw that there was a hash mismatch, then we will retry after a delay - console.log(`Verify error on ${obj.id}: {resp.error}. Object will be re-tried`); + console.log(`Verify error on ${obj.id}: ${result.error}. Object will be re-tried`); sleep(__ENV.SLEEP); }