diff --git a/internal/registry/obj_registry.go b/internal/registry/obj_registry.go index bb5e24c..53b097e 100644 --- a/internal/registry/obj_registry.go +++ b/internal/registry/obj_registry.go @@ -1,6 +1,7 @@ package registry import ( + "context" "encoding/binary" "encoding/json" "errors" @@ -11,6 +12,8 @@ import ( ) type ObjRegistry struct { + ctx context.Context + cancel context.CancelFunc boltDB *bbolt.DB } @@ -38,14 +41,20 @@ type ObjectInfo struct { // about objects in the specified bolt database. As registry uses read-write // connection to the database, there may be only one instance of object registry // per database file at a time. -func NewObjRegistry(dbFilePath string) *ObjRegistry { +func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry { options := bbolt.Options{Timeout: 100 * time.Millisecond} boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options) if err != nil { panic(err) } - objRepository := &ObjRegistry{boltDB: boltDB} + ctx, cancel := context.WithCancel(ctx) + + objRepository := &ObjRegistry{ + ctx: ctx, + cancel: cancel, + boltDB: boltDB, + } return objRepository } @@ -118,6 +127,7 @@ func (o *ObjRegistry) DeleteObject(id uint64) error { } func (o *ObjRegistry) Close() error { + o.cancel() return o.boltDB.Close() } diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index c0d1556..b4f346b 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -1,8 +1,9 @@ package registry import ( + "context" "encoding/json" - "sync" + "fmt" "time" "go.etcd.io/bbolt" @@ -14,97 +15,40 @@ type ObjFilter struct { } type ObjSelector struct { - boltDB *bbolt.DB - filter *ObjFilter - mu sync.Mutex - lastId uint64 - // UTC date&time before which selector is locked for iteration or resetting. - // This lock prevents concurrency issues when some VUs are selecting objects - // while another VU resets the selector and attempts to select the same objects - lockedUntil time.Time + ctx context.Context + objChan chan *ObjectInfo + boltDB *bbolt.DB + filter *ObjFilter } +// objectSelectCache is a maximum number of the selected objects to be +// cached for the ObjSelector.NextObject. +const objectSelectCache = 100 + // NewObjSelector creates a new instance of object selector that can iterate over // objects in the specified registry. func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector { - objSelector := &ObjSelector{boltDB: registry.boltDB, filter: filter} + objSelector := &ObjSelector{ + ctx: registry.ctx, + boltDB: registry.boltDB, + filter: filter, + objChan: make(chan *ObjectInfo, objectSelectCache), + } + + go objSelector.selectLoop() + return objSelector } // NextObject returns the next object from the registry that matches filter of // the selector. NextObject only roams forward from the current position of the -// selector. If there are no objects that match the filter, then returns nil. -func (o *ObjSelector) NextObject() (*ObjectInfo, error) { - var foundObj *ObjectInfo - err := o.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(bucketName)) - if b == nil { - return nil - } - - c := b.Cursor() - - // We use mutex so that multiple VUs won't attempt to modify lastId simultaneously - // TODO: consider singleton channel that will produce those ids on demand - o.mu.Lock() - defer o.mu.Unlock() - - if time.Now().UTC().Before(o.lockedUntil) { - return nil - } - - // Establish the start position for searching the next object: - // If we should go from the beginning (lastId=0), then we start from the first element - // Otherwise we start from the key right after the lastId - var keyBytes, objBytes []byte - if o.lastId == 0 { - keyBytes, objBytes = c.First() - } else { - c.Seek(encodeId(o.lastId)) - keyBytes, objBytes = c.Next() - } - - // Iterate over objects to find the next object matching the filter - var obj ObjectInfo - for ; keyBytes != nil; keyBytes, objBytes = c.Next() { - if objBytes != nil { - if err := json.Unmarshal(objBytes, &obj); err != nil { - // Ignore malformed objects for now. Maybe it should be panic? - continue - } - // If we reached an object that matches filter, stop iterating - if o.filter.match(obj) { - foundObj = &obj - break - } - } - } - - // Update the last key - if keyBytes != nil { - o.lastId = decodeId(keyBytes) - return nil - } - - return nil - }) - return foundObj, err -} - -// Resets object selector to start scanning objects from the beginning. -// After resetting the selector is locked for specified lockTime to prevent -// concurrency issues. -func (o *ObjSelector) Reset(lockTime int) bool { - o.mu.Lock() - defer o.mu.Unlock() - - if time.Now().UTC().Before(o.lockedUntil) { - return false - } - - o.lastId = 0 - o.lockedUntil = time.Now().UTC().Add(time.Duration(lockTime) * time.Second) - return true +// selector. If there are no objects that match the filter, blocks until one of +// the following happens: +// - a "new" next object is available; +// - underlying registry context is done, nil objects will be returned on the +// currently blocked and every further NextObject calls. +func (o *ObjSelector) NextObject() *ObjectInfo { + return <-o.objChan } // Count returns total number of objects that match filter of the selector. @@ -133,6 +77,88 @@ func (o *ObjSelector) Count() (int, error) { return count, err } +func (o *ObjSelector) selectLoop() { + cache := make([]*ObjectInfo, 0, objectSelectCache) + var lastID uint64 + defer close(o.objChan) + + for { + select { + case <-o.ctx.Done(): + return + default: + } + + // cache the objects + err := o.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucketName)) + if b == nil { + return nil + } + + c := b.Cursor() + + // Establish the start position for searching the next object: + // If we should go from the beginning (lastID=0), then we start + // from the first element. Otherwise, we start from the last + // handled ID + 1. + var keyBytes, objBytes []byte + if lastID == 0 { + keyBytes, objBytes = c.First() + } else { + keyBytes, objBytes = c.Seek(encodeId(lastID)) + if keyBytes != nil && decodeId(keyBytes) == lastID { + keyBytes, objBytes = c.Next() + } + } + + // Iterate over objects to find the next object matching the filter. + for ; keyBytes != nil && len(cache) != objectSelectCache; keyBytes, objBytes = c.Next() { + if objBytes != nil { + var obj ObjectInfo + if err := json.Unmarshal(objBytes, &obj); err != nil { + // Ignore malformed objects for now. Maybe it should be panic? + continue + } + + if o.filter.match(obj) { + cache = append(cache, &obj) + } + } + } + + if len(cache) > 0 { + lastID = cache[len(cache)-1].Id + } + + return nil + }) + if err != nil { + panic(fmt.Errorf("fetching objects failed: %w", err)) + } + + for _, obj := range cache { + select { + case <-o.ctx.Done(): + return + case o.objChan <- obj: + } + } + + if len(cache) != objectSelectCache { + // no more objects, wait a little; the logic could be improved. + select { + case <-time.After(time.Second * time.Duration(o.filter.Age/2)): + case <-o.ctx.Done(): + return + } + } + + // clean handled objects + cache = cache[:0] + } +} + func (f *ObjFilter) match(o ObjectInfo) bool { if f.Status != "" && f.Status != o.Status { return false diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 27650f8..78155e6 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -68,7 +68,7 @@ func (r *Registry) Open(dbFilePath string) *ObjRegistry { func (r *Registry) open(dbFilePath string) *ObjRegistry { registry := r.root.registries[dbFilePath] if registry == nil { - registry = NewObjRegistry(dbFilePath) + registry = NewObjRegistry(r.vu.Context(), dbFilePath) r.root.registries[dbFilePath] = registry } return registry diff --git a/scenarios/grpc.js b/scenarios/grpc.js index 1ac66ba..35d309f 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -143,10 +143,6 @@ export function obj_delete() { const obj = obj_to_delete_selector.nextObject(); if (!obj) { - // If there are no objects to delete, we reset selector to start scanning from the - // beginning of registry. Then we wait for some time until suitable object might appear - obj_to_delete_selector.reset(delete_age); - sleep(delete_age / 2); return; } diff --git a/scenarios/s3.js b/scenarios/s3.js index 827c936..3d5242b 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -141,10 +141,6 @@ export function obj_delete() { const obj = obj_to_delete_selector.nextObject(); if (!obj) { - // If there are no objects to delete, we reset selector to start scanning from the - // beginning of registry. Then we wait for some time until suitable object might appear - obj_to_delete_selector.reset(delete_age); - sleep(delete_age / 2); return; }