[#153] selector: Add agent synchronization in Oneshot
mode
All checks were successful
DCO action / DCO (pull_request) Successful in 59s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m14s
Tests and linters / Tests with -race (pull_request) Successful in 2m16s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m37s
Tests and linters / Lint (pull_request) Successful in 3m9s
All checks were successful
DCO action / DCO (pull_request) Successful in 59s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m14s
Tests and linters / Tests with -race (pull_request) Successful in 2m16s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m37s
Tests and linters / Lint (pull_request) Successful in 3m9s
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
parent
9b9db46a07
commit
62c69f4afd
12 changed files with 62 additions and 50 deletions
|
@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
|
||||
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
|
||||
objSelector := registry.NewObjSelector(objRegistry, 0, registry.SelectorAwaiting, ®istry.ObjFilter{
|
||||
objSelector := registry.NewObjSelector(objRegistry, 0, 1, registry.SelectorAwaiting, ®istry.ObjFilter{
|
||||
Status: status,
|
||||
Age: age,
|
||||
})
|
||||
|
|
|
@ -34,7 +34,7 @@ func TestObjectExporter(t *testing.T) {
|
|||
func runExportTest(t *testing.T) {
|
||||
expected := getExpectedResult(t)
|
||||
objReg := getFilledRegistry(t, expected)
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}))
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated}))
|
||||
|
||||
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
|
||||
require.NoError(t, checkExported(expected.objects, expected.jsonName))
|
||||
|
@ -49,13 +49,13 @@ func runExportChangedTest(t *testing.T) {
|
|||
changedObjects := make([]ObjectInfo, num)
|
||||
require.Equal(t, num, copy(changedObjects[:], expected.objects[:]))
|
||||
|
||||
sel := NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})
|
||||
sel := NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated})
|
||||
for i := range changedObjects {
|
||||
changedObjects[i].Status = newStatus
|
||||
require.NoError(t, objReg.SetObjectStatus(sel.NextObject().Id, statusCreated, newStatus))
|
||||
}
|
||||
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: newStatus}))
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: newStatus}))
|
||||
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
|
||||
require.NoError(t, checkExported(changedObjects, expected.jsonName))
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ func runExportEmptyTest(t *testing.T) {
|
|||
expected := getExpectedResult(t)
|
||||
expected.objects = make([]ObjectInfo, 0)
|
||||
objReg := getFilledRegistry(t, expected)
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}))
|
||||
objExp := NewObjExporter(NewObjSelector(objReg, 0, 1, SelectorOneshot, &ObjFilter{Status: statusCreated}))
|
||||
|
||||
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
|
||||
require.NoError(t, checkExported(expected.objects, expected.jsonName))
|
||||
|
|
|
@ -3,6 +3,7 @@ package registry
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
|
@ -23,6 +24,7 @@ type ObjSelector struct {
|
|||
filter *ObjFilter
|
||||
cacheSize int
|
||||
kind SelectorKind
|
||||
sincAgents sync.WaitGroup
|
||||
}
|
||||
|
||||
// objectSelectCache is the default maximum size of a batch to select from DB.
|
||||
|
@ -30,7 +32,11 @@ const objectSelectCache = 1000
|
|||
|
||||
// NewObjSelector creates a new instance of object selector that can iterate over
|
||||
// objects in the specified registry.
|
||||
func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
|
||||
// countAgents specifies the number of agents involved in data selection.
|
||||
// It is utilized to synchronize operations in `Oneshot` mode once all objects are selected.
|
||||
// Set countAgents to 1 to preserve existing functionality.
|
||||
// See:https://git.frostfs.info/TrueCloudLab/xk6-frostfs/issues/153#issuecomment-45181
|
||||
func NewObjSelector(registry *ObjRegistry, selectionSize, countAgents int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
|
||||
if selectionSize <= 0 {
|
||||
selectionSize = objectSelectCache
|
||||
}
|
||||
|
@ -45,6 +51,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
|
|||
cacheSize: selectionSize,
|
||||
kind: kind,
|
||||
}
|
||||
objSelector.sincAgents.Add(countAgents)
|
||||
|
||||
go objSelector.selectLoop()
|
||||
|
||||
|
@ -60,7 +67,12 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
|
|||
// currently blocked and every further NextObject calls.
|
||||
func (o *ObjSelector) NextObject() *ObjectInfo {
|
||||
if o.kind == SelectorOneshot {
|
||||
return <-o.objChan
|
||||
obj := <-o.objChan
|
||||
if obj == nil {
|
||||
o.sincAgents.Done()
|
||||
o.sincAgents.Wait()
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
@ -90,19 +90,19 @@ const (
|
|||
SelectorOneshot
|
||||
)
|
||||
|
||||
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorAwaiting, filter)
|
||||
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorAwaiting, filter)
|
||||
}
|
||||
|
||||
func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorLooped, filter)
|
||||
func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorLooped, filter)
|
||||
}
|
||||
|
||||
func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorOneshot, filter)
|
||||
func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize, countAgents int, filter map[string]string) *ObjSelector {
|
||||
return r.getSelectorInternal(dbFilePath, name, cacheSize, countAgents, SelectorOneshot, filter)
|
||||
}
|
||||
|
||||
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, kind SelectorKind, filter map[string]string) *ObjSelector {
|
||||
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize, countAgents int, kind SelectorKind, filter map[string]string) *ObjSelector {
|
||||
objFilter, err := parseFilter(filter)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -114,7 +114,7 @@ func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize
|
|||
selector := r.root.selectors[name]
|
||||
if selector == nil {
|
||||
registry := r.open(dbFilePath)
|
||||
selector = NewObjSelector(registry, cacheSize, kind, objFilter)
|
||||
selector = NewObjSelector(registry, cacheSize, countAgents, kind, objFilter)
|
||||
r.root.selectors[name] = selector
|
||||
} else if !reflect.DeepEqual(selector.filter, objFilter) {
|
||||
panic(fmt.Sprintf("selector %s already has been created with a different filter", name))
|
||||
|
|
|
@ -50,7 +50,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getLoopedSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
age: read_age,
|
||||
})
|
||||
|
@ -82,7 +82,7 @@ if (registry_enabled && delete_age) {
|
|||
|
||||
obj_to_delete_selector =
|
||||
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
age: delete_age,
|
||||
});
|
||||
|
|
|
@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined;
|
|||
if (registry_enabled && delete_age) {
|
||||
obj_to_delete_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
age: delete_age,
|
||||
});
|
||||
|
@ -60,7 +60,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getLoopedSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
|
||||
status: 'created',
|
||||
age: read_age,
|
||||
})
|
||||
|
|
|
@ -48,7 +48,7 @@ let obj_to_delete_selector = undefined;
|
|||
if (registry_enabled && delete_age) {
|
||||
obj_to_delete_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
|
||||
status: 'created',
|
||||
age: delete_age,
|
||||
});
|
||||
|
|
|
@ -50,7 +50,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getLoopedSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
|
||||
status : 'created',
|
||||
age : read_age,
|
||||
})
|
||||
|
@ -70,6 +70,21 @@ if (write_vu_count > 0) {
|
|||
};
|
||||
}
|
||||
|
||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||
if (delete_vu_count > 0) {
|
||||
if (!obj_to_delete_selector) {
|
||||
throw 'Positive DELETE worker number without a proper object selector';
|
||||
}
|
||||
|
||||
scenarios.delete = {
|
||||
executor : 'constant-vus',
|
||||
vus : delete_vu_count,
|
||||
duration : `${duration}s`,
|
||||
exec : 'obj_delete',
|
||||
gracefulStop : '5s',
|
||||
};
|
||||
}
|
||||
|
||||
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
|
||||
let obj_to_delete_selector = undefined;
|
||||
let obj_to_delete_exit_on_null = undefined;
|
||||
|
@ -81,7 +96,7 @@ if (registry_enabled && delete_age) {
|
|||
|
||||
obj_to_delete_selector =
|
||||
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, delete_vu_count,{
|
||||
status : 'created',
|
||||
age : delete_age,
|
||||
});
|
||||
|
@ -98,21 +113,6 @@ if (read_vu_count > 0) {
|
|||
};
|
||||
}
|
||||
|
||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||
if (delete_vu_count > 0) {
|
||||
if (!obj_to_delete_selector) {
|
||||
throw 'Positive DELETE worker number without a proper object selector';
|
||||
}
|
||||
|
||||
scenarios.delete = {
|
||||
executor : 'constant-vus',
|
||||
vus : delete_vu_count,
|
||||
duration : `${duration}s`,
|
||||
exec : 'obj_delete',
|
||||
gracefulStop : '5s',
|
||||
};
|
||||
}
|
||||
|
||||
export const options = {
|
||||
scenarios,
|
||||
setupTimeout : '5s',
|
||||
|
|
|
@ -49,7 +49,7 @@ let obj_to_delete_selector = undefined;
|
|||
if (registry_enabled && delete_age) {
|
||||
obj_to_delete_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
age: delete_age,
|
||||
});
|
||||
|
@ -60,7 +60,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getLoopedSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1,{
|
||||
status: 'created',
|
||||
age: read_age,
|
||||
})
|
||||
|
|
|
@ -51,7 +51,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status : 'created',
|
||||
age : read_age,
|
||||
})
|
||||
|
@ -94,7 +94,7 @@ if (registry_enabled ) {
|
|||
|
||||
obj_to_delete_selector =
|
||||
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status : 'read',
|
||||
age : delete_age,
|
||||
});
|
||||
|
|
|
@ -62,7 +62,7 @@ let obj_to_read_selector = undefined;
|
|||
if (registry_enabled) {
|
||||
obj_to_read_selector = registry.getLoopedSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_read',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
})
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ if (__ENV.S3_ENDPOINTS) {
|
|||
// retries to be verified
|
||||
const obj_to_verify_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, 'obj_to_verify',
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, 1, {
|
||||
status: 'created',
|
||||
});
|
||||
const obj_to_verify_count = obj_to_verify_selector.count();
|
||||
|
@ -97,7 +97,7 @@ export function setup() {
|
|||
for (const [status, counter] of Object.entries(obj_counters)) {
|
||||
const obj_selector = registry.getSelector(
|
||||
__ENV.REGISTRY_FILE, status,
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status });
|
||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,1, { status });
|
||||
counter.add(obj_selector.count());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue