[#133] registry: Implement oneshot selector

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
pull/133/head
Evgenii Stratonikov 2024-04-05 13:16:31 +03:00
parent d1ec9e4bf0
commit 87ffb551b6
3 changed files with 34 additions and 10 deletions

View File

@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
}
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
objSelector := registry.NewObjSelector(objRegistry, 0, false, &registry.ObjFilter{
objSelector := registry.NewObjSelector(objRegistry, 0, registry.SelectorAwaiting, &registry.ObjFilter{
Status: status,
Age: age,
})

View File

@ -20,7 +20,7 @@ type ObjSelector struct {
boltDB *bbolt.DB
filter *ObjFilter
cacheSize int
looped bool
kind SelectorKind
}
// objectSelectCache is the default maximum size of a batch to select from DB.
@ -28,7 +28,7 @@ const objectSelectCache = 1000
// NewObjSelector creates a new instance of object selector that can iterate over
// objects in the specified registry.
func NewObjSelector(registry *ObjRegistry, selectionSize int, looped bool, filter *ObjFilter) *ObjSelector {
func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
if selectionSize <= 0 {
selectionSize = objectSelectCache
}
@ -41,7 +41,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, looped bool, filte
filter: filter,
objChan: make(chan *ObjectInfo, selectionSize*2),
cacheSize: selectionSize,
looped: looped,
kind: kind,
}
go objSelector.selectLoop()
@ -162,7 +162,11 @@ func (o *ObjSelector) selectLoop() {
}
}
if !o.looped && len(cache) != o.cacheSize {
if o.kind == SelectorOneshot && len(cache) != o.cacheSize {
return
}
if o.kind != SelectorLooped && len(cache) != o.cacheSize {
// no more objects, wait a little; the logic could be improved.
select {
case <-time.After(time.Second):
@ -171,7 +175,7 @@ func (o *ObjSelector) selectLoop() {
}
}
if o.looped && len(cache) != o.cacheSize {
if o.kind == SelectorLooped && len(cache) != o.cacheSize {
lastID = 0
}

View File

@ -74,15 +74,35 @@ func (r *Registry) open(dbFilePath string) *ObjRegistry {
return registry
}
// SelectorKind represents selector behaviour when no items are available.
type SelectorKind byte
const (
// SelectorAwaiting waits for a new item to arrive.
// This selector visits each item exactly once and can be used when items
// to select are being pushed into registry concurrently.
SelectorAwaiting = iota
// SelectorLooped rewinds cursor to the start after all items have been read.
// It can encounter duplicates and should be used mostly for read scenarious.
SelectorLooped
// SelectorOneshot visits each item exactly once and exits immediately afterwards.
// It may be used to artificially abort the test after all items were processed.
SelectorOneshot
)
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, false, filter)
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorAwaiting, filter)
}
func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, true, filter)
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorLooped, filter)
}
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, looped bool, filter map[string]string) *ObjSelector {
func (r *Registry) GetOneshotSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
return r.getSelectorInternal(dbFilePath, name, cacheSize, SelectorOneshot, filter)
}
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, kind SelectorKind, filter map[string]string) *ObjSelector {
objFilter, err := parseFilter(filter)
if err != nil {
panic(err)
@ -94,7 +114,7 @@ func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize
selector := r.root.selectors[name]
if selector == nil {
registry := r.open(dbFilePath)
selector = NewObjSelector(registry, cacheSize, looped, objFilter)
selector = NewObjSelector(registry, cacheSize, kind, objFilter)
r.root.selectors[name] = selector
} else if !reflect.DeepEqual(selector.filter, objFilter) {
panic(fmt.Sprintf("selector %s already has been created with a different filter", name))