[#153] selector: Add agent synchronization in Oneshot
mode
This commit is contained in:
parent
9b9db46a07
commit
becacd4251
2 changed files with 17 additions and 1 deletions
|
@ -3,6 +3,7 @@ package registry
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
|
@ -23,6 +24,7 @@ type ObjSelector struct {
|
|||
filter *ObjFilter
|
||||
cacheSize int
|
||||
kind SelectorKind
|
||||
sinc sync.WaitGroup
|
||||
}
|
||||
|
||||
// objectSelectCache is the default maximum size of a batch to select from DB.
|
||||
|
@ -105,6 +107,18 @@ func (o *ObjSelector) Count() (int, error) {
|
|||
return count, err
|
||||
}
|
||||
|
||||
// SetCountVU sets the number of VUs that will work with this selector
|
||||
func (o *ObjSelector) SetCountVU(count int) {
|
||||
o.sinc = sync.WaitGroup{}
|
||||
o.sinc.Add(count)
|
||||
}
|
||||
|
||||
// SincVU allows synchronizing VUs that use this selector
|
||||
func (o *ObjSelector) SincVU() {
|
||||
o.sinc.Done()
|
||||
o.sinc.Wait()
|
||||
}
|
||||
|
||||
func (o *ObjSelector) selectLoop() {
|
||||
cache := make([]*ObjectInfo, 0, o.cacheSize)
|
||||
var lastID uint64
|
||||
|
|
|
@ -70,6 +70,7 @@ if (write_vu_count > 0) {
|
|||
};
|
||||
}
|
||||
|
||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
|
||||
let obj_to_delete_selector = undefined;
|
||||
let obj_to_delete_exit_on_null = undefined;
|
||||
|
@ -85,6 +86,7 @@ if (registry_enabled && delete_age) {
|
|||
status : 'created',
|
||||
age : delete_age,
|
||||
});
|
||||
obj_to_delete_selector.setCountVU(delete_vu_count)
|
||||
}
|
||||
|
||||
const read_vu_count = parseInt(__ENV.READERS || '0');
|
||||
|
@ -98,7 +100,6 @@ if (read_vu_count > 0) {
|
|||
};
|
||||
}
|
||||
|
||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||
if (delete_vu_count > 0) {
|
||||
if (!obj_to_delete_selector) {
|
||||
throw 'Positive DELETE worker number without a proper object selector';
|
||||
|
@ -204,6 +205,7 @@ export function obj_delete() {
|
|||
const obj = obj_to_delete_selector.nextObject();
|
||||
if (!obj) {
|
||||
if (obj_to_delete_exit_on_null) {
|
||||
obj_to_delete_selector.sincVU()
|
||||
exec.test.abort("No more objects to select");
|
||||
}
|
||||
return;
|
||||
|
|
Loading…
Reference in a new issue