[#153] selector: Add agent synchronization in Oneshot mode
All checks were successful
DCO action / DCO (pull_request) Successful in 55s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m44s
Tests and linters / Lint (pull_request) Successful in 1m58s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m10s
Tests and linters / Tests with -race (pull_request) Successful in 2m12s

Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
Alexander Chuprov 2024-07-29 17:08:48 +03:00
parent 9b9db46a07
commit d15f6550b2
2 changed files with 17 additions and 1 deletions

View file

@ -3,6 +3,7 @@ package registry
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
@ -23,6 +24,7 @@ type ObjSelector struct {
filter *ObjFilter filter *ObjFilter
cacheSize int cacheSize int
kind SelectorKind kind SelectorKind
sinc sync.WaitGroup
} }
// objectSelectCache is the default maximum size of a batch to select from DB. // objectSelectCache is the default maximum size of a batch to select from DB.
@ -105,6 +107,18 @@ func (o *ObjSelector) Count() (int, error) {
return count, err return count, err
} }
// SetCountVU sets the number of VUs that will work with this selector
func (o *ObjSelector) SetCountVU(count int) {
o.sinc = sync.WaitGroup{}
o.sinc.Add(count)
}
// SincVU allows synchronizing VUs that use this selector
func (o *ObjSelector) SincVU() {
o.sinc.Done()
o.sinc.Wait()
}
func (o *ObjSelector) selectLoop() { func (o *ObjSelector) selectLoop() {
cache := make([]*ObjectInfo, 0, o.cacheSize) cache := make([]*ObjectInfo, 0, o.cacheSize)
var lastID uint64 var lastID uint64

View file

@ -70,6 +70,7 @@ if (write_vu_count > 0) {
}; };
} }
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined; const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined; let obj_to_delete_selector = undefined;
let obj_to_delete_exit_on_null = undefined; let obj_to_delete_exit_on_null = undefined;
@ -85,6 +86,7 @@ if (registry_enabled && delete_age) {
status : 'created', status : 'created',
age : delete_age, age : delete_age,
}); });
obj_to_delete_selector.setCountVU(delete_vu_count)
} }
const read_vu_count = parseInt(__ENV.READERS || '0'); const read_vu_count = parseInt(__ENV.READERS || '0');
@ -98,7 +100,6 @@ if (read_vu_count > 0) {
}; };
} }
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) { if (delete_vu_count > 0) {
if (!obj_to_delete_selector) { if (!obj_to_delete_selector) {
throw 'Positive DELETE worker number without a proper object selector'; throw 'Positive DELETE worker number without a proper object selector';
@ -204,6 +205,7 @@ export function obj_delete() {
const obj = obj_to_delete_selector.nextObject(); const obj = obj_to_delete_selector.nextObject();
if (!obj) { if (!obj) {
if (obj_to_delete_exit_on_null) { if (obj_to_delete_exit_on_null) {
obj_to_delete_selector.sincVU()
exec.test.abort("No more objects to select"); exec.test.abort("No more objects to select");
} }
return; return;