[#153] selector: Add agent synchronization in Oneshot mode
Some checks failed
Tests and linters / Tests (1.22) (pull_request) Successful in 2m37s
Tests and linters / Lint (pull_request) Successful in 5m5s
DCO action / DCO (pull_request) Failing after 45s
Tests and linters / Tests with -race (pull_request) Successful in 1m48s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m9s

This commit is contained in:
Alexander Chuprov 2024-07-26 14:23:10 +03:00
parent 9b9db46a07
commit 1766e88839
12 changed files with 62 additions and 50 deletions

View file

@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
} }
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0]) objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
objSelector := registry.NewObjSelector(objRegistry, 0, registry.SelectorAwaiting, &registry.ObjFilter{ objSelector := registry.NewObjSelector(objRegistry, 0, 1, registry.SelectorAwaiting, &registry.ObjFilter{
Status: status, Status: status,
Age: age, Age: age,
}) })

View file

@ -34,7 +34,7 @@ func TestObjectExporter(t *testing.T) {
func runExportTest(t *testing.T) { func runExportTest(t *testing.T) {
expected := getExpectedResult(t) expected := getExpectedResult(t)
objReg := getFilledRegistry(t, expected) objReg := getFilledRegistry(t, expected)
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})) objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(expected.objects, expected.jsonName)) require.NoError(t, checkExported(expected.objects, expected.jsonName))
@ -49,13 +49,13 @@ func runExportChangedTest(t *testing.T) {
changedObjects := make([]ObjectInfo, num) changedObjects := make([]ObjectInfo, num)
require.Equal(t, num, copy(changedObjects[:], expected.objects[:])) require.Equal(t, num, copy(changedObjects[:], expected.objects[:]))
sel := NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}) sel := NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated})
for i := range changedObjects { for i := range changedObjects {
changedObjects[i].Status = newStatus changedObjects[i].Status = newStatus
require.NoError(t, objReg.SetObjectStatus(sel.NextObject().Id, statusCreated, newStatus)) require.NoError(t, objReg.SetObjectStatus(sel.NextObject().Id, statusCreated, newStatus))
} }
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: newStatus})) objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: newStatus}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(changedObjects, expected.jsonName)) require.NoError(t, checkExported(changedObjects, expected.jsonName))
} }
@ -64,7 +64,7 @@ func runExportEmptyTest(t *testing.T) {
expected := getExpectedResult(t) expected := getExpectedResult(t)
expected.objects = make([]ObjectInfo, 0) expected.objects = make([]ObjectInfo, 0)
objReg := getFilledRegistry(t, expected) objReg := getFilledRegistry(t, expected)
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})) objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(expected.objects, expected.jsonName)) require.NoError(t, checkExported(expected.objects, expected.jsonName))

View file

@ -3,6 +3,7 @@ package registry
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
@ -17,12 +18,13 @@ type ObjFilter struct {
} }
type ObjSelector struct { type ObjSelector struct {
ctx context.Context ctx context.Context
objChan chan *ObjectInfo objChan chan *ObjectInfo
boltDB *bbolt.DB boltDB *bbolt.DB
filter *ObjFilter filter *ObjFilter
cacheSize int cacheSize int
kind SelectorKind kind SelectorKind
sincAgents sync.WaitGroup
} }
// objectSelectCache is the default maximum size of a batch to select from DB. // objectSelectCache is the default maximum size of a batch to select from DB.
@ -30,7 +32,11 @@ const objectSelectCache = 1000
// NewObjSelector creates a new instance of object selector that can iterate over // NewObjSelector creates a new instance of object selector that can iterate over
// objects in the specified registry. // objects in the specified registry.
func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector { // countAgents specifies the number of agents involved in data selection.
// It is utilized to synchronize operations in `Oneshot` mode once all objects are selected.
// Set countAgents to 1 to preserve existing functionality.
// See:https://git.frostfs.info/TrueCloudLab/xk6-frostfs/issues/153#issuecomment-45181
func NewObjSelector(registry *ObjRegistry, selectionSize, countAgents int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
if selectionSize <= 0 { if selectionSize <= 0 {
selectionSize = objectSelectCache selectionSize = objectSelectCache
} }
@ -45,6 +51,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
cacheSize: selectionSize, cacheSize: selectionSize,
kind: kind, kind: kind,
} }
objSelector.sincAgents.Add(countAgents)
go objSelector.selectLoop() go objSelector.selectLoop()
@ -60,7 +67,12 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
// currently blocked and every further NextObject calls. // currently blocked and every further NextObject calls.
func (o *ObjSelector) NextObject() *ObjectInfo { func (o *ObjSelector) NextObject() *ObjectInfo {
if o.kind == SelectorOneshot { if o.kind == SelectorOneshot {
return <-o.objChan obj := <-o.objChan
if obj == nil {
o.sincAgents.Done()
o.sincAgents.Wait()
}
return obj
} }
select { select {

View file

@ -90,19 +90,19 @@ const (
SelectorOneshot SelectorOneshot
) )
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorAwaiting, filter) return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorAwaiting, filter)
} }
func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorLooped, filter) return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorLooped, filter)
} }
func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorOneshot, filter) return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorOneshot, filter)
} }
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, kind SelectorKind, filter map[string]string) *ObjSelector { func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize, countAgents int, kind SelectorKind, filter map[string]string) *ObjSelector {
objFilter, err := parseFilter(filter) objFilter, err := parseFilter(filter)
if err != nil { if err != nil {
panic(err) panic(err)
@ -114,7 +114,7 @@ func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize
selector := r.root.selectors[name] selector := r.root.selectors[name]
if selector == nil { if selector == nil {
registry := r.open(dbFilePath) registry := r.open(dbFilePath)
selector = NewObjSelector(registry, cacheSize, kind, objFilter) selector = NewObjSelector(registry, cacheSize, countAgents, kind, objFilter)
r.root.selectors[name] = selector r.root.selectors[name] = selector
} else if !reflect.DeepEqual(selector.filter, objFilter) { } else if !reflect.DeepEqual(selector.filter, objFilter) {
panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) panic(fmt.Sprintf("selector %s already has been created with a different filter", name))

View file

@ -50,7 +50,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector( obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
age: read_age, age: read_age,
}) })
@ -82,7 +82,7 @@ if (registry_enabled && delete_age) {
obj_to_delete_selector = obj_to_delete_selector =
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
age: delete_age, age: delete_age,
}); });

View file

@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined;
if (registry_enabled && delete_age) { if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector( obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_delete', __ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
age: delete_age, age: delete_age,
}); });
@ -60,7 +60,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector( obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
status: 'created', status: 'created',
age: read_age, age: read_age,
}) })

View file

@ -48,7 +48,7 @@ let obj_to_delete_selector = undefined;
if (registry_enabled && delete_age) { if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector( obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_delete', __ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
status: 'created', status: 'created',
age: delete_age, age: delete_age,
}); });

View file

@ -50,7 +50,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector( obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
status : 'created', status : 'created',
age : read_age, age : read_age,
}) })
@ -70,6 +70,21 @@ if (write_vu_count > 0) {
}; };
} }
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) {
if (!obj_to_delete_selector) {
throw 'Positive DELETE worker number without a proper object selector';
}
scenarios.delete = {
executor : 'constant-vus',
vus : delete_vu_count,
duration : `${duration}s`,
exec : 'obj_delete',
gracefulStop : '5s',
};
}
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined; const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined; let obj_to_delete_selector = undefined;
let obj_to_delete_exit_on_null = undefined; let obj_to_delete_exit_on_null = undefined;
@ -81,7 +96,7 @@ if (registry_enabled && delete_age) {
obj_to_delete_selector = obj_to_delete_selector =
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, delete_vu_count,{
status : 'created', status : 'created',
age : delete_age, age : delete_age,
}); });
@ -98,21 +113,6 @@ if (read_vu_count > 0) {
}; };
} }
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) {
if (!obj_to_delete_selector) {
throw 'Positive DELETE worker number without a proper object selector';
}
scenarios.delete = {
executor : 'constant-vus',
vus : delete_vu_count,
duration : `${duration}s`,
exec : 'obj_delete',
gracefulStop : '5s',
};
}
export const options = { export const options = {
scenarios, scenarios,
setupTimeout : '5s', setupTimeout : '5s',

View file

@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined;
if (registry_enabled && delete_age) { if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector( obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_delete', __ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
age: delete_age, age: delete_age,
}); });
@ -60,7 +60,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector( obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
status: 'created', status: 'created',
age: read_age, age: read_age,
}) })

View file

@ -51,7 +51,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getSelector( obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status : 'created', status : 'created',
age : read_age, age : read_age,
}) })
@ -94,7 +94,7 @@ if (registry_enabled ) {
obj_to_delete_selector = obj_to_delete_selector =
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status : 'read', status : 'read',
age : delete_age, age : delete_age,
}); });

View file

@ -62,7 +62,7 @@ let obj_to_read_selector = undefined;
if (registry_enabled) { if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector( obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read', __ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
}) })
} }

View file

@ -65,7 +65,7 @@ if (__ENV.S3_ENDPOINTS) {
// retries to be verified // retries to be verified
const obj_to_verify_selector = registry.getSelector( const obj_to_verify_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_verify', __ENV.REGISTRY_FILE, 'obj_to_verify',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
status: 'created', status: 'created',
}); });
const obj_to_verify_count = obj_to_verify_selector.count(); const obj_to_verify_count = obj_to_verify_selector.count();
@ -97,7 +97,7 @@ export function setup() {
for (const [status, counter] of Object.entries(obj_counters)) { for (const [status, counter] of Object.entries(obj_counters)) {
const obj_selector = registry.getSelector( const obj_selector = registry.getSelector(
__ENV.REGISTRY_FILE, status, __ENV.REGISTRY_FILE, status,
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status }); __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,1, { status });
counter.add(obj_selector.count()); counter.add(obj_selector.count());
} }
} }