From 87ffb551b6ad2963ebbc6e524c0a9e8ac4cd7848 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 5 Apr 2024 13:16:31 +0300 Subject: [PATCH] [#133] registry: Implement oneshot selector Signed-off-by: Evgenii Stratonikov --- cmd/xk6-registry-exporter/root.go | 2 +- internal/registry/obj_selector.go | 14 +++++++++----- internal/registry/registry.go | 28 ++++++++++++++++++++++++---- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/cmd/xk6-registry-exporter/root.go b/cmd/xk6-registry-exporter/root.go index b7d8227..b359df1 100644 --- a/cmd/xk6-registry-exporter/root.go +++ b/cmd/xk6-registry-exporter/root.go @@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error { } objRegistry := registry.NewObjRegistry(cmd.Context(), args[0]) - objSelector := registry.NewObjSelector(objRegistry, 0, false, ®istry.ObjFilter{ + objSelector := registry.NewObjSelector(objRegistry, 0, registry.SelectorAwaiting, ®istry.ObjFilter{ Status: status, Age: age, }) diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index 6519a8e..ef9e0d9 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -20,7 +20,7 @@ type ObjSelector struct { boltDB *bbolt.DB filter *ObjFilter cacheSize int - looped bool + kind SelectorKind } // objectSelectCache is the default maximum size of a batch to select from DB. @@ -28,7 +28,7 @@ const objectSelectCache = 1000 // NewObjSelector creates a new instance of object selector that can iterate over // objects in the specified registry. -func NewObjSelector(registry *ObjRegistry, selectionSize int, looped bool, filter *ObjFilter) *ObjSelector { +func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector { if selectionSize <= 0 { selectionSize = objectSelectCache } @@ -41,7 +41,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, looped bool, filte filter: filter, objChan: make(chan *ObjectInfo, selectionSize*2), cacheSize: selectionSize, - looped: looped, + kind: kind, } go objSelector.selectLoop() @@ -162,7 +162,11 @@ func (o *ObjSelector) selectLoop() { } } - if !o.looped && len(cache) != o.cacheSize { + if o.kind == SelectorOneshot && len(cache) != o.cacheSize { + return + } + + if o.kind != SelectorLooped && len(cache) != o.cacheSize { // no more objects, wait a little; the logic could be improved. select { case <-time.After(time.Second): @@ -171,7 +175,7 @@ func (o *ObjSelector) selectLoop() { } } - if o.looped && len(cache) != o.cacheSize { + if o.kind == SelectorLooped && len(cache) != o.cacheSize { lastID = 0 } diff --git a/internal/registry/registry.go b/internal/registry/registry.go index a635424..0aad78a 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -74,15 +74,35 @@ func (r *Registry) open(dbFilePath string) *ObjRegistry { return registry } +// SelectorKind represents selector behaviour when no items are available. +type SelectorKind byte + +const ( + // SelectorAwaiting waits for a new item to arrive. + // This selector visits each item exactly once and can be used when items + // to select are being pushed into registry concurrently. + SelectorAwaiting = iota + // SelectorLooped rewinds cursor to the start after all items have been read. + // It can encounter duplicates and should be used mostly for read scenarious. + SelectorLooped + // SelectorOneshot visits each item exactly once and exits immediately afterwards. + // It may be used to artificially abort the test after all items were processed. + SelectorOneshot +) + func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { - return r.getSelectorInternal(dbFilePath, name, cacheSize, false, filter) + return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorAwaiting, filter) } func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { - return r.getSelectorInternal(dbFilePath, name, cacheSize, true, filter) + return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorLooped, filter) } -func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, looped bool, filter map[string]string) *ObjSelector { +func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector { + return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorOneshot, filter) +} + +func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, kind SelectorKind, filter map[string]string) *ObjSelector { objFilter, err := parseFilter(filter) if err != nil { panic(err) @@ -94,7 +114,7 @@ func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize selector := r.root.selectors[name] if selector == nil { registry := r.open(dbFilePath) - selector = NewObjSelector(registry, cacheSize, looped, objFilter) + selector = NewObjSelector(registry, cacheSize, kind, objFilter) r.root.selectors[name] = selector } else if !reflect.DeepEqual(selector.filter, objFilter) { panic(fmt.Sprintf("selector %s already has been created with a different filter", name))