From 1766e888395eb7cbf1cd5bd0a373105c1a5bd145 Mon Sep 17 00:00:00 2001 From: Alexander Chuprov Date: Fri, 26 Jul 2024 14:23:10 +0300 Subject: [PATCH] [#153] selector: Add agent synchronization in `Oneshot` mode --- cmd/xk6-registry-exporter/root.go | 2 +- internal/registry/obj_exporter_test.go | 8 +++--- internal/registry/obj_selector.go | 28 +++++++++++++++------ internal/registry/registry.go | 16 ++++++------ scenarios/grpc.js | 4 +-- scenarios/grpc_car.js | 4 +-- scenarios/local.js | 2 +- scenarios/s3.js | 34 +++++++++++++------------- scenarios/s3_car.js | 4 +-- scenarios/s3_dar.js | 4 +-- scenarios/s3local.js | 2 +- scenarios/verify.js | 4 +-- 12 files changed, 62 insertions(+), 50 deletions(-) diff --git a/cmd/xk6-registry-exporter/root.go b/cmd/xk6-registry-exporter/root.go index b359df1..14155cf 100644 --- a/cmd/xk6-registry-exporter/root.go +++ b/cmd/xk6-registry-exporter/root.go @@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error { } objRegistry := registry.NewObjRegistry(cmd.Context(), args[0]) - objSelector := registry.NewObjSelector(objRegistry, 0, registry.SelectorAwaiting, ®istry.ObjFilter{ + objSelector := registry.NewObjSelector(objRegistry, 0, 1, registry.SelectorAwaiting, ®istry.ObjFilter{ Status: status, Age: age, }) diff --git a/internal/registry/obj_exporter_test.go b/internal/registry/obj_exporter_test.go index 87e8984..80aa43a 100644 --- a/internal/registry/obj_exporter_test.go +++ b/internal/registry/obj_exporter_test.go @@ -34,7 +34,7 @@ func TestObjectExporter(t *testing.T) { func runExportTest(t *testing.T) { expected := getExpectedResult(t) objReg := getFilledRegistry(t, expected) - objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})) + objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated})) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, checkExported(expected.objects, expected.jsonName)) @@ -49,13 +49,13 @@ func runExportChangedTest(t *testing.T) { changedObjects := make([]ObjectInfo, num) require.Equal(t, num, copy(changedObjects[:], expected.objects[:])) - sel := NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}) + sel := NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated}) for i := range changedObjects { changedObjects[i].Status = newStatus require.NoError(t, objReg.SetObjectStatus(sel.NextObject().Id, statusCreated, newStatus)) } - objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: newStatus})) + objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: newStatus})) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, checkExported(changedObjects, expected.jsonName)) } @@ -64,7 +64,7 @@ func runExportEmptyTest(t *testing.T) { expected := getExpectedResult(t) expected.objects = make([]ObjectInfo, 0) objReg := getFilledRegistry(t, expected) - objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})) + objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated})) require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName)) require.NoError(t, checkExported(expected.objects, expected.jsonName)) diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index 78abb37..2282564 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -3,6 +3,7 @@ package registry import ( "context" "fmt" + "sync" "time" "github.com/nspcc-dev/neo-go/pkg/io" @@ -17,12 +18,13 @@ type ObjFilter struct { } type ObjSelector struct { - ctx context.Context - objChan chan *ObjectInfo - boltDB *bbolt.DB - filter *ObjFilter - cacheSize int - kind SelectorKind + ctx context.Context + objChan chan *ObjectInfo + boltDB *bbolt.DB + filter *ObjFilter + cacheSize int + kind SelectorKind + sincAgents sync.WaitGroup } // objectSelectCache is the default maximum size of a batch to select from DB. @@ -30,7 +32,11 @@ const objectSelectCache = 1000 // NewObjSelector creates a new instance of object selector that can iterate over // objects in the specified registry. -func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector { +// countAgents specifies the number of agents involved in data selection. +// It is utilized to synchronize operations in `Oneshot` mode once all objects are selected. +// Set countAgents to 1 to preserve existing functionality. +// See:https://git.frostfs.info/TrueCloudLab/xk6-frostfs/issues/153#issuecomment-45181 +func NewObjSelector(registry *ObjRegistry, selectionSize, countAgents int, kind SelectorKind, filter *ObjFilter) *ObjSelector { if selectionSize <= 0 { selectionSize = objectSelectCache } @@ -45,6 +51,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, cacheSize: selectionSize, kind: kind, } + objSelector.sincAgents.Add(countAgents) go objSelector.selectLoop() @@ -60,7 +67,12 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, // currently blocked and every further NextObject calls. func (o *ObjSelector) NextObject() *ObjectInfo { if o.kind == SelectorOneshot { - return <-o.objChan + obj := <-o.objChan + if obj == nil { + o.sincAgents.Done() + o.sincAgents.Wait() + } + return obj } select { diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 0aad78a..fbe517d 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -90,19 +90,19 @@ const ( SelectorOneshot ) -func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { - return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorAwaiting, filter) +func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector { + return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorAwaiting, filter) } -func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { - return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorLooped, filter) +func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector { + return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorLooped, filter) } -func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { - return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorOneshot, filter) +func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector { + return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorOneshot, filter) } -func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, kind SelectorKind, filter map[string]string) *ObjSelector { +func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize, countAgents int, kind SelectorKind, filter map[string]string) *ObjSelector { objFilter, err := parseFilter(filter) if err != nil { panic(err) @@ -114,7 +114,7 @@ func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize selector := r.root.selectors[name] if selector == nil { registry := r.open(dbFilePath) - selector = NewObjSelector(registry, cacheSize, kind, objFilter) + selector = NewObjSelector(registry, cacheSize, countAgents, kind, objFilter) r.root.selectors[name] = selector } else if !reflect.DeepEqual(selector.filter, objFilter) { panic(fmt.Sprintf("selector %s already has been created with a different filter", name)) diff --git a/scenarios/grpc.js b/scenarios/grpc.js index 973cf25..2c8feaa 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -50,7 +50,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getLoopedSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', age: read_age, }) @@ -82,7 +82,7 @@ if (registry_enabled && delete_age) { obj_to_delete_selector = constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', age: delete_age, }); diff --git a/scenarios/grpc_car.js b/scenarios/grpc_car.js index d866610..3960c88 100644 --- a/scenarios/grpc_car.js +++ b/scenarios/grpc_car.js @@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined; if (registry_enabled && delete_age) { obj_to_delete_selector = registry.getSelector( __ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', age: delete_age, }); @@ -60,7 +60,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getLoopedSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{ status: 'created', age: read_age, }) diff --git a/scenarios/local.js b/scenarios/local.js index 7b3f003..bdf0fa4 100644 --- a/scenarios/local.js +++ b/scenarios/local.js @@ -48,7 +48,7 @@ let obj_to_delete_selector = undefined; if (registry_enabled && delete_age) { obj_to_delete_selector = registry.getSelector( __ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{ status: 'created', age: delete_age, }); diff --git a/scenarios/s3.js b/scenarios/s3.js index 22b031a..ac68c8b 100644 --- a/scenarios/s3.js +++ b/scenarios/s3.js @@ -50,7 +50,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getLoopedSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{ status : 'created', age : read_age, }) @@ -70,6 +70,21 @@ if (write_vu_count > 0) { }; } +const delete_vu_count = parseInt(__ENV.DELETERS || '0'); +if (delete_vu_count > 0) { + if (!obj_to_delete_selector) { + throw 'Positive DELETE worker number without a proper object selector'; + } + + scenarios.delete = { + executor : 'constant-vus', + vus : delete_vu_count, + duration : `${duration}s`, + exec : 'obj_delete', + gracefulStop : '5s', + }; +} + const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined; let obj_to_delete_selector = undefined; let obj_to_delete_exit_on_null = undefined; @@ -81,7 +96,7 @@ if (registry_enabled && delete_age) { obj_to_delete_selector = constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, delete_vu_count,{ status : 'created', age : delete_age, }); @@ -98,21 +113,6 @@ if (read_vu_count > 0) { }; } -const delete_vu_count = parseInt(__ENV.DELETERS || '0'); -if (delete_vu_count > 0) { - if (!obj_to_delete_selector) { - throw 'Positive DELETE worker number without a proper object selector'; - } - - scenarios.delete = { - executor : 'constant-vus', - vus : delete_vu_count, - duration : `${duration}s`, - exec : 'obj_delete', - gracefulStop : '5s', - }; -} - export const options = { scenarios, setupTimeout : '5s', diff --git a/scenarios/s3_car.js b/scenarios/s3_car.js index 2aaa94e..2c77602 100644 --- a/scenarios/s3_car.js +++ b/scenarios/s3_car.js @@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined; if (registry_enabled && delete_age) { obj_to_delete_selector = registry.getSelector( __ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', age: delete_age, }); @@ -60,7 +60,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getLoopedSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{ status: 'created', age: read_age, }) diff --git a/scenarios/s3_dar.js b/scenarios/s3_dar.js index 5f10bd0..e21bee8 100644 --- a/scenarios/s3_dar.js +++ b/scenarios/s3_dar.js @@ -51,7 +51,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status : 'created', age : read_age, }) @@ -94,7 +94,7 @@ if (registry_enabled ) { obj_to_delete_selector = constructor(__ENV.REGISTRY_FILE, 'obj_to_delete', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status : 'read', age : delete_age, }); diff --git a/scenarios/s3local.js b/scenarios/s3local.js index 6eaf049..080e9c1 100644 --- a/scenarios/s3local.js +++ b/scenarios/s3local.js @@ -62,7 +62,7 @@ let obj_to_read_selector = undefined; if (registry_enabled) { obj_to_read_selector = registry.getLoopedSelector( __ENV.REGISTRY_FILE, 'obj_to_read', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', }) } diff --git a/scenarios/verify.js b/scenarios/verify.js index 15a0055..848d6c9 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -65,7 +65,7 @@ if (__ENV.S3_ENDPOINTS) { // retries to be verified const obj_to_verify_selector = registry.getSelector( __ENV.REGISTRY_FILE, 'obj_to_verify', - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, { status: 'created', }); const obj_to_verify_count = obj_to_verify_selector.count(); @@ -97,7 +97,7 @@ export function setup() { for (const [status, counter] of Object.entries(obj_counters)) { const obj_selector = registry.getSelector( __ENV.REGISTRY_FILE, status, - __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status }); + __ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,1, { status }); counter.add(obj_selector.count()); } }