[#32] registry: Allow to customize cache size for DELETE

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2022-11-10 09:23:33 +03:00 committed by fyrchik
parent c43f73704e
commit 883c3c259a
6 changed files with 31 additions and 18 deletions

View file

@ -15,24 +15,28 @@ type ObjFilter struct {
} }
type ObjSelector struct { type ObjSelector struct {
ctx context.Context ctx context.Context
objChan chan *ObjectInfo objChan chan *ObjectInfo
boltDB *bbolt.DB boltDB *bbolt.DB
filter *ObjFilter filter *ObjFilter
cacheSize int
} }
// objectSelectCache is a maximum number of the selected objects to be // objectSelectCache is the default maximum size of a batch to select from DB.
// cached for the ObjSelector.NextObject. const objectSelectCache = 1000
const objectSelectCache = 100
// NewObjSelector creates a new instance of object selector that can iterate over // NewObjSelector creates a new instance of object selector that can iterate over
// objects in the specified registry. // objects in the specified registry.
func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector { func NewObjSelector(registry *ObjRegistry, selectionSize int, filter *ObjFilter) *ObjSelector {
if selectionSize <= 0 {
selectionSize = objectSelectCache
}
objSelector := &ObjSelector{ objSelector := &ObjSelector{
ctx: registry.ctx, ctx: registry.ctx,
boltDB: registry.boltDB, boltDB: registry.boltDB,
filter: filter, filter: filter,
objChan: make(chan *ObjectInfo, objectSelectCache), objChan: make(chan *ObjectInfo, selectionSize*2),
cacheSize: selectionSize,
} }
go objSelector.selectLoop() go objSelector.selectLoop()
@ -78,7 +82,7 @@ func (o *ObjSelector) Count() (int, error) {
} }
func (o *ObjSelector) selectLoop() { func (o *ObjSelector) selectLoop() {
cache := make([]*ObjectInfo, 0, objectSelectCache) cache := make([]*ObjectInfo, 0, o.cacheSize)
var lastID uint64 var lastID uint64
defer close(o.objChan) defer close(o.objChan)
@ -113,7 +117,7 @@ func (o *ObjSelector) selectLoop() {
} }
// Iterate over objects to find the next object matching the filter. // Iterate over objects to find the next object matching the filter.
for ; keyBytes != nil && len(cache) != objectSelectCache; keyBytes, objBytes = c.Next() { for ; keyBytes != nil && len(cache) != o.cacheSize; keyBytes, objBytes = c.Next() {
if objBytes != nil { if objBytes != nil {
var obj ObjectInfo var obj ObjectInfo
if err := json.Unmarshal(objBytes, &obj); err != nil { if err := json.Unmarshal(objBytes, &obj); err != nil {
@ -145,7 +149,7 @@ func (o *ObjSelector) selectLoop() {
} }
} }
if len(cache) != objectSelectCache { if len(cache) != o.cacheSize {
// no more objects, wait a little; the logic could be improved. // no more objects, wait a little; the logic could be improved.
select { select {
case <-time.After(time.Second * time.Duration(o.filter.Age/2)): case <-time.After(time.Second * time.Duration(o.filter.Age/2)):

View file

@ -74,7 +74,7 @@ func (r *Registry) open(dbFilePath string) *ObjRegistry {
return registry return registry
} }
func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string]string) *ObjSelector { func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
objFilter, err := parseFilter(filter) objFilter, err := parseFilter(filter)
if err != nil { if err != nil {
panic(err) panic(err)
@ -86,7 +86,7 @@ func (r *Registry) GetSelector(dbFilePath string, name string, filter map[string
selector := r.root.selectors[name] selector := r.root.selectors[name]
if selector == nil { if selector == nil {
registry := r.open(dbFilePath) registry := r.open(dbFilePath)
selector = NewObjSelector(registry, objFilter) selector = NewObjSelector(registry, cacheSize, objFilter)
r.root.selectors[name] = selector r.root.selectors[name] = selector
} else if !reflect.DeepEqual(selector.filter, objFilter) { } else if !reflect.DeepEqual(selector.filter, objFilter) {
panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) panic(fmt.Sprintf("selector %s already has been created with a different filter", name))

View file

@ -30,6 +30,7 @@ if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector( obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, __ENV.REGISTRY_FILE,
"obj_to_delete", "obj_to_delete",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{ {
status: "created", status: "created",
age: delete_age, age: delete_age,

View file

@ -13,6 +13,7 @@ Scenarios `grpc.js`, `http.js` and `s3.js` support the following options:
* `PREGEN_JSON` - path to json file with pre-generated containers and objects (in case of http scenario we use json pre-generated for grpc scenario). * `PREGEN_JSON` - path to json file with pre-generated containers and objects (in case of http scenario we use json pre-generated for grpc scenario).
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations. * `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
Examples of how to use these options are provided below for each scenario. Examples of how to use these options are provided below for each scenario.
@ -115,3 +116,4 @@ Options:
* `TIME_LIMIT` - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state. * `TIME_LIMIT` - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state.
* `REGISTRY_FILE` - database file from which objects for verification should be read. * `REGISTRY_FILE` - database file from which objects for verification should be read.
* `SLEEP` - time interval (in seconds) between VU iterations. * `SLEEP` - time interval (in seconds) between VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).

View file

@ -30,6 +30,7 @@ if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector( obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, __ENV.REGISTRY_FILE,
"obj_to_delete", "obj_to_delete",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{ {
status: "created", status: "created",
age: delete_age, age: delete_age,

View file

@ -41,6 +41,7 @@ if (__ENV.S3_ENDPOINTS) {
const obj_to_verify_selector = registry.getSelector( const obj_to_verify_selector = registry.getSelector(
__ENV.REGISTRY_FILE, __ENV.REGISTRY_FILE,
"obj_to_verify", "obj_to_verify",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{ {
status: "created", status: "created",
} }
@ -70,7 +71,11 @@ export const options = {
export function setup() { export function setup() {
// Populate counters with initial values // Populate counters with initial values
for (const [status, counter] of Object.entries(obj_counters)) { for (const [status, counter] of Object.entries(obj_counters)) {
const obj_selector = registry.getSelector(__ENV.REGISTRY_FILE, status, { status }); const obj_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
status,
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{ status });
counter.add(obj_selector.count()); counter.add(obj_selector.count());
} }
} }