diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 1b1d5a9..bdeaea6 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -21,6 +21,7 @@ jobs: uses: golangci/golangci-lint-action@v2 with: version: latest + args: --timeout=2m tests: name: Tests diff --git a/internal/registry/obj_registry.go b/internal/registry/obj_registry.go index 0caed21..58543db 100644 --- a/internal/registry/obj_registry.go +++ b/internal/registry/obj_registry.go @@ -11,8 +11,7 @@ import ( ) type ObjRegistry struct { - boltDB *bbolt.DB - objSelector *ObjSelector + boltDB *bbolt.DB } const ( @@ -25,17 +24,20 @@ const bucketName = "_object" // ObjectInfo represents information about neoFS object that has been created // via gRPC/HTTP/S3 API. type ObjectInfo struct { - Id uint64 // Identifier in bolt DB - CID string // Container ID in gRPC/HTTP - OID string // Object ID in gRPC/HTTP - S3Bucket string // Bucket name in S3 - S3Key string // Object key in S3 - Status string // Status of the object - PayloadHash string // SHA256 hash of object payload that can be used for verification + Id uint64 // Identifier in bolt DB + CreatedAt time.Time // UTC date&time when the object was created + CID string // Container ID in gRPC/HTTP + OID string // Object ID in gRPC/HTTP + S3Bucket string // Bucket name in S3 + S3Key string // Object key in S3 + Status string // Status of the object + PayloadHash string // SHA256 hash of object payload that can be used for verification } -// NewModuleInstance implements the modules.Module interface and returns -// a new instance for each VU. +// NewObjRegistry creates a new instance of object registry that stores information +// about objects in the specified bolt database. As registry uses read-write +// connection to the database, there may be only one instance of object registry +// per database file at a time. func NewObjRegistry(dbFilePath string) *ObjRegistry { options := bbolt.Options{Timeout: 100 * time.Millisecond} boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options) @@ -43,9 +45,7 @@ func NewObjRegistry(dbFilePath string) *ObjRegistry { panic(err) } - objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated} - - objRepository := &ObjRegistry{boltDB: boltDB, objSelector: &objSelector} + objRepository := &ObjRegistry{boltDB: boltDB} return objRepository } @@ -63,6 +63,7 @@ func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) e object := ObjectInfo{ Id: id, + CreatedAt: time.Now().UTC(), CID: cid, OID: oid, S3Bucket: s3Bucket, @@ -105,35 +106,6 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error { }) } -func (o *ObjRegistry) GetObjectCountInStatus(status string) (int, error) { - var objCount = 0 - err := o.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(bucketName)) - if b == nil { - return nil - } - - return b.ForEach(func(_, objBytes []byte) error { - if objBytes != nil { - var obj ObjectInfo - if err := json.Unmarshal(objBytes, &obj); err != nil { - // Ignore malformed objects - return nil - } - if obj.Status == status { - objCount++ - } - } - return nil - }) - }) - return objCount, err -} - -func (o *ObjRegistry) NextObjectToVerify() (*ObjectInfo, error) { - return o.objSelector.NextObject() -} - func (o *ObjRegistry) Close() error { return o.boltDB.Close() } diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index 8efe5d6..f41a634 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -2,19 +2,34 @@ package registry import ( "encoding/json" - "errors" "sync" + "time" "go.etcd.io/bbolt" ) -type ObjSelector struct { - boltDB *bbolt.DB - mu sync.Mutex - lastId uint64 - objStatus string +type ObjFilter struct { + Status string + Age int } +type ObjSelector struct { + boltDB *bbolt.DB + filter *ObjFilter + mu sync.Mutex + lastId uint64 +} + +// NewObjSelector creates a new instance of object selector that can iterate over +// objects in the specified registry. +func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector { + objSelector := &ObjSelector{boltDB: registry.boltDB, filter: filter} + return objSelector +} + +// NextObject returns the next object from the registry that matches filter of +// the selector. NextObject only roams forward from the current position of the +// selector. If there are no objects that match the filter, then returns nil. func (o *ObjSelector) NextObject() (*ObjectInfo, error) { var foundObj *ObjectInfo err := o.boltDB.View(func(tx *bbolt.Tx) error { @@ -41,7 +56,7 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { keyBytes, objBytes = c.Next() } - // Iterate over objects to find the next object in the target status + // Iterate over objects to find the next object matching the filter var obj ObjectInfo for ; keyBytes != nil; keyBytes, objBytes = c.Next() { if objBytes != nil { @@ -49,8 +64,8 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { // Ignore malformed objects for now. Maybe it should be panic? continue } - // If we reached an object in the target status, stop iterating - if obj.Status == o.objStatus { + // If we reached an object that matches filter, stop iterating + if o.filter.match(obj) { foundObj = &obj break } @@ -63,7 +78,54 @@ func (o *ObjSelector) NextObject() (*ObjectInfo, error) { return nil } - return errors.New("no objects are available") + return nil }) return foundObj, err } + +// Resets object selector to start scanning objects from the beginning. +func (o *ObjSelector) Reset() { + o.mu.Lock() + defer o.mu.Unlock() + + o.lastId = 0 +} + +// Count returns total number of objects that match filter of the selector. +func (o *ObjSelector) Count() (int, error) { + var count = 0 + err := o.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + if b == nil { + return nil + } + + return b.ForEach(func(_, objBytes []byte) error { + if objBytes != nil { + var obj ObjectInfo + if err := json.Unmarshal(objBytes, &obj); err != nil { + // Ignore malformed objects + return nil + } + if o.filter.match(obj) { + count++ + } + } + return nil + }) + }) + return count, err +} + +func (f *ObjFilter) match(o ObjectInfo) bool { + if f.Status != "" && f.Status != o.Status { + return false + } + if f.Age != 0 { + objAge := time.Now().UTC().Sub(o.CreatedAt).Seconds() + if objAge < float64(f.Age) { + return false + } + } + return true +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go index f1cb8f9..27650f8 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -1,6 +1,9 @@ package registry import ( + "fmt" + "reflect" + "strconv" "sync" "go.k6.io/k6/js/modules" @@ -12,7 +15,9 @@ type RootModule struct { // Stores object registry by path of database file. We should have only single instance // of registry per each file registries map[string]*ObjRegistry - // Mutex to sync access to repositories map + // Stores object selector by name. We may have multiple selectors per database file + selectors map[string]*ObjSelector + // Mutex to sync access to the maps mu sync.Mutex } @@ -29,7 +34,10 @@ var ( ) func init() { - rootModule := &RootModule{registries: make(map[string]*ObjRegistry)} + rootModule := &RootModule{ + registries: make(map[string]*ObjRegistry), + selectors: make(map[string]*ObjSelector), + } modules.Register("k6/x/neofs/registry", rootModule) } @@ -53,7 +61,11 @@ func (r *Registry) Exports() modules.Exports { func (r *Registry) Open(dbFilePath string) *ObjRegistry { r.root.mu.Lock() defer r.root.mu.Unlock() + return r.open(dbFilePath) +} +// Implementation of Open without mutex lock, so that it can be re-used in other methods. +func (r *Registry) open(dbFilePath string) *ObjRegistry { registry := r.root.registries[dbFilePath] if registry == nil { registry = NewObjRegistry(dbFilePath) @@ -61,3 +73,38 @@ func (r *Registry) Open(dbFilePath string) *ObjRegistry { } return registry } + +func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string]string) *ObjSelector { + objFilter, err := parseFilter(filter) + if err != nil { + panic(err) + } + + r.root.mu.Lock() + defer r.root.mu.Unlock() + + selector := r.root.selectors[name] + if selector == nil { + registry := r.open(dbFilePath) + selector = NewObjSelector(registry, objFilter) + r.root.selectors[name] = selector + } else if !reflect.DeepEqual(selector.filter, objFilter) { + panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) + } + return selector +} + +func parseFilter(filter map[string]string) (*ObjFilter, error) { + objFilter := ObjFilter{} + objFilter.Status = filter["status"] + + if ageStr := filter["age"]; ageStr != "" { + age, err := strconv.ParseInt(ageStr, 10, 64) + if err != nil { + return nil, err + } + objFilter.Age = int(age) + } + + return &objFilter, nil +} diff --git a/scenarios/verify.js b/scenarios/verify.js index 6925dc5..75ce0c0 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -37,10 +37,17 @@ if (__ENV.S3_ENDPOINTS) { } // We will attempt to verify every object in "created" status. The scenario will execute -// as many scenarios as there are objects. Each object will have 3 retries to be verified -const obj_count_to_verify = obj_registry.getObjectCountInStatus("created"); -// Execute at least one iteration (shared-iterations can't run 0 iterations) -const iterations = Math.max(1, obj_count_to_verify); +// as many iterations as there are objects. Each object will have 3 retries to be verified +const obj_to_verify_selector = registry.getSelector( + __ENV.REGISTRY_FILE, + "obj_to_verify", + { + status: "created", + } +); +const obj_to_verify_count = obj_to_verify_selector.count(); +// Execute at least one iteration (executor shared-iterations can't run 0 iterations) +const iterations = Math.max(1, obj_to_verify_count); // Executor shared-iterations requires number of iterations to be larger than number of VUs const vus = Math.min(__ENV.CLIENTS, iterations); @@ -63,7 +70,8 @@ export const options = { export function setup() { // Populate counters with initial values for (const [status, counter] of Object.entries(obj_counters)) { - counter.add(obj_registry.getObjectCountInStatus(status)); + const obj_selector = registry.getSelector(__ENV.REGISTRY_FILE, status, { status }); + counter.add(obj_selector.count()); } } @@ -72,7 +80,7 @@ export function obj_verify() { sleep(__ENV.SLEEP); } - const obj = obj_registry.nextObjectToVerify(); + const obj = obj_to_verify_selector.nextObject(); if (!obj) { console.log("All objects have been verified"); return; @@ -103,7 +111,7 @@ function verify_object_with_retries(obj, attempts) { } // Unless we explicitly saw that there was a hash mismatch, then we will retry after a delay - console.log(`Verify error on ${obj.id}: {resp.error}. Object will be re-tried`); + console.log(`Verify error on ${obj.id}: ${result.error}. Object will be re-tried`); sleep(__ENV.SLEEP); }