diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index b4f346b..f983f6f 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -15,24 +15,28 @@ type ObjFilter struct { } type ObjSelector struct { - ctx context.Context - objChan chan *ObjectInfo - boltDB *bbolt.DB - filter *ObjFilter + ctx context.Context + objChan chan *ObjectInfo + boltDB *bbolt.DB + filter *ObjFilter + cacheSize int } -// objectSelectCache is a maximum number of the selected objects to be -// cached for the ObjSelector.NextObject. -const objectSelectCache = 100 +// objectSelectCache is the default maximum size of a batch to select from DB. +const objectSelectCache = 1000 // NewObjSelector creates a new instance of object selector that can iterate over // objects in the specified registry. -func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector { +func NewObjSelector(registry *ObjRegistry, selectionSize int, filter *ObjFilter) *ObjSelector { + if selectionSize <= 0 { + selectionSize = objectSelectCache + } objSelector := &ObjSelector{ - ctx: registry.ctx, - boltDB: registry.boltDB, - filter: filter, - objChan: make(chan *ObjectInfo, objectSelectCache), + ctx: registry.ctx, + boltDB: registry.boltDB, + filter: filter, + objChan: make(chan *ObjectInfo, selectionSize*2), + cacheSize: selectionSize, } go objSelector.selectLoop() @@ -78,7 +82,7 @@ func (o *ObjSelector) Count() (int, error) { } func (o *ObjSelector) selectLoop() { - cache := make([]*ObjectInfo, 0, objectSelectCache) + cache := make([]*ObjectInfo, 0, o.cacheSize) var lastID uint64 defer close(o.objChan) @@ -113,7 +117,7 @@ func (o *ObjSelector) selectLoop() { } // Iterate over objects to find the next object matching the filter. - for ; keyBytes != nil && len(cache) != objectSelectCache; keyBytes, objBytes = c.Next() { + for ; keyBytes != nil && len(cache) != o.cacheSize; keyBytes, objBytes = c.Next() { if objBytes != nil { var obj ObjectInfo if err := json.Unmarshal(objBytes, &obj); err != nil { @@ -145,7 +149,7 @@ func (o *ObjSelector) selectLoop() { } } - if len(cache) != objectSelectCache { + if len(cache) != o.cacheSize { // no more objects, wait a little; the logic could be improved. select { case <-time.After(time.Second * time.Duration(o.filter.Age/2)): diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 78155e6..96a9fdb 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -74,7 +74,7 @@ func (r *Registry) open(dbFilePath string) *ObjRegistry { return registry } -func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string]string) *ObjSelector { +func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { objFilter, err := parseFilter(filter) if err != nil { panic(err) @@ -86,7 +86,7 @@ func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string selector := r.root.selectors[name] if selector == nil { registry := r.open(dbFilePath) - selector = NewObjSelector(registry, objFilter) + selector = NewObjSelector(registry, cacheSize, objFilter) r.root.selectors[name] = selector } else if !reflect.DeepEqual(selector.filter, objFilter) { panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) diff --git a/scenarios/grpc.js b/scenarios/grpc.js index 35d309f..6d52343 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -30,6 +30,7 @@ if (registry_enabled && delete_age) { obj_to_delete_selector = registry.getSelector( __ENV.REGISTRY_FILE, "obj_to_delete", + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status: "created", age: delete_age, diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index b044e44..83f6470 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -13,6 +13,7 @@ Scenarios `grpc.js`, `http.js` and `s3.js` support the following options: * `PREGEN_JSON` - path to json file with pre-generated containers and objects (in case of http scenario we use json pre-generated for grpc scenario). * `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations. * `SLEEP_READ` - time interval (in seconds) between reading VU iterations. + * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). Examples of how to use these options are provided below for each scenario. @@ -115,3 +116,4 @@ Options: * `TIME_LIMIT` - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state. * `REGISTRY_FILE` - database file from which objects for verification should be read. * `SLEEP` - time interval (in seconds) between VU iterations. + * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). diff --git a/scenarios/s3.js b/scenarios/s3.js index 3d5242b..a2f3261 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -30,6 +30,7 @@ if (registry_enabled && delete_age) { obj_to_delete_selector = registry.getSelector( __ENV.REGISTRY_FILE, "obj_to_delete", + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status: "created", age: delete_age, diff --git a/scenarios/verify.js b/scenarios/verify.js index 25ebf84..2af3372 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -41,6 +41,7 @@ if (__ENV.S3_ENDPOINTS) { const obj_to_verify_selector = registry.getSelector( __ENV.REGISTRY_FILE, "obj_to_verify", + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status: "created", } @@ -70,7 +71,11 @@ export const options = { export function setup() { // Populate counters with initial values for (const [status, counter] of Object.entries(obj_counters)) { - const obj_selector = registry.getSelector(__ENV.REGISTRY_FILE, status, { status }); + const obj_selector = registry.getSelector( + __ENV.REGISTRY_FILE, + status, + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, + { status }); counter.add(obj_selector.count()); } }