registry: Try to continue iteration #131
8 changed files with 22 additions and 9 deletions
|
@ -78,7 +78,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
|
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
|
||||||
objSelector := registry.NewObjSelector(objRegistry, 0, ®istry.ObjFilter{
|
objSelector := registry.NewObjSelector(objRegistry, 0, false, ®istry.ObjFilter{
|
||||||
Status: status,
|
Status: status,
|
||||||
Age: age,
|
Age: age,
|
||||||
})
|
})
|
||||||
|
|
|
@ -20,6 +20,7 @@ type ObjSelector struct {
|
||||||
boltDB *bbolt.DB
|
boltDB *bbolt.DB
|
||||||
filter *ObjFilter
|
filter *ObjFilter
|
||||||
cacheSize int
|
cacheSize int
|
||||||
|
looped bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectSelectCache is the default maximum size of a batch to select from DB.
|
// objectSelectCache is the default maximum size of a batch to select from DB.
|
||||||
|
@ -27,7 +28,7 @@ const objectSelectCache = 1000
|
||||||
|
|
||||||
// NewObjSelector creates a new instance of object selector that can iterate over
|
// NewObjSelector creates a new instance of object selector that can iterate over
|
||||||
// objects in the specified registry.
|
// objects in the specified registry.
|
||||||
func NewObjSelector(registry *ObjRegistry, selectionSize int, filter *ObjFilter) *ObjSelector {
|
func NewObjSelector(registry *ObjRegistry, selectionSize int, looped bool, filter *ObjFilter) *ObjSelector {
|
||||||
if selectionSize <= 0 {
|
if selectionSize <= 0 {
|
||||||
selectionSize = objectSelectCache
|
selectionSize = objectSelectCache
|
||||||
}
|
}
|
||||||
|
@ -40,6 +41,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, filter *ObjFilter)
|
||||||
filter: filter,
|
filter: filter,
|
||||||
objChan: make(chan *ObjectInfo, selectionSize*2),
|
objChan: make(chan *ObjectInfo, selectionSize*2),
|
||||||
cacheSize: selectionSize,
|
cacheSize: selectionSize,
|
||||||
|
looped: looped,
|
||||||
}
|
}
|
||||||
|
|
||||||
go objSelector.selectLoop()
|
go objSelector.selectLoop()
|
||||||
|
@ -160,13 +162,16 @@ func (o *ObjSelector) selectLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(cache) != o.cacheSize {
|
if !o.looped && len(cache) != o.cacheSize {
|
||||||
// no more objects, wait a little; the logic could be improved.
|
// no more objects, wait a little; the logic could be improved.
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
case <-o.ctx.Done():
|
case <-o.ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.looped && len(cache) != o.cacheSize {
|
||||||
lastID = 0
|
lastID = 0
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,14 @@ func (r *Registry) open(dbFilePath string) *ObjRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
||||||
|
return r.getSelectorInternal(dbFilePath, name, cacheSize, false, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) GetLoopedSelector(dbFilePath string, name string, cacheSize int, filter map[string]string) *ObjSelector {
|
||||||
|
return r.getSelectorInternal(dbFilePath, name, cacheSize, true, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Registry) getSelectorInternal(dbFilePath string, name string, cacheSize int, looped bool, filter map[string]string) *ObjSelector {
|
||||||
objFilter, err := parseFilter(filter)
|
objFilter, err := parseFilter(filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -86,7 +94,7 @@ func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, fi
|
||||||
selector := r.root.selectors[name]
|
selector := r.root.selectors[name]
|
||||||
if selector == nil {
|
if selector == nil {
|
||||||
registry := r.open(dbFilePath)
|
registry := r.open(dbFilePath)
|
||||||
selector = NewObjSelector(registry, cacheSize, objFilter)
|
selector = NewObjSelector(registry, cacheSize, looped, objFilter)
|
||||||
r.root.selectors[name] = selector
|
r.root.selectors[name] = selector
|
||||||
} else if !reflect.DeepEqual(selector.filter, objFilter) {
|
} else if !reflect.DeepEqual(selector.filter, objFilter) {
|
||||||
panic(fmt.Sprintf("selector %s already has been created with a different filter", name))
|
panic(fmt.Sprintf("selector %s already has been created with a different filter", name))
|
||||||
|
|
|
@ -57,7 +57,7 @@ if (registry_enabled && delete_age) {
|
||||||
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
||||||
let obj_to_read_selector = undefined;
|
let obj_to_read_selector = undefined;
|
||||||
if (registry_enabled) {
|
if (registry_enabled) {
|
||||||
obj_to_read_selector = registry.getSelector(
|
obj_to_read_selector = registry.getLoopedSelector(
|
||||||
__ENV.REGISTRY_FILE,
|
__ENV.REGISTRY_FILE,
|
||||||
"obj_to_read",
|
"obj_to_read",
|
||||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||||
|
|
|
@ -57,7 +57,7 @@ if (registry_enabled && delete_age) {
|
||||||
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
||||||
let obj_to_read_selector = undefined;
|
let obj_to_read_selector = undefined;
|
||||||
if (registry_enabled) {
|
if (registry_enabled) {
|
||||||
obj_to_read_selector = registry.getSelector(
|
obj_to_read_selector = registry.getLoopedSelector(
|
||||||
__ENV.REGISTRY_FILE,
|
__ENV.REGISTRY_FILE,
|
||||||
"obj_to_read",
|
"obj_to_read",
|
||||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||||
|
|
|
@ -56,7 +56,7 @@ if (registry_enabled && delete_age) {
|
||||||
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
||||||
let obj_to_read_selector = undefined;
|
let obj_to_read_selector = undefined;
|
||||||
if (registry_enabled) {
|
if (registry_enabled) {
|
||||||
obj_to_read_selector = registry.getSelector(
|
obj_to_read_selector = registry.getLoopedSelector(
|
||||||
__ENV.REGISTRY_FILE,
|
__ENV.REGISTRY_FILE,
|
||||||
"obj_to_read",
|
"obj_to_read",
|
||||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||||
|
|
|
@ -56,7 +56,7 @@ if (registry_enabled && delete_age) {
|
||||||
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
|
||||||
let obj_to_read_selector = undefined;
|
let obj_to_read_selector = undefined;
|
||||||
if (registry_enabled) {
|
if (registry_enabled) {
|
||||||
obj_to_read_selector = registry.getSelector(
|
obj_to_read_selector = registry.getLoopedSelector(
|
||||||
__ENV.REGISTRY_FILE,
|
__ENV.REGISTRY_FILE,
|
||||||
"obj_to_read",
|
"obj_to_read",
|
||||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||||
|
|
|
@ -54,7 +54,7 @@ const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : und
|
||||||
|
|
||||||
let obj_to_read_selector = undefined;
|
let obj_to_read_selector = undefined;
|
||||||
if (registry_enabled) {
|
if (registry_enabled) {
|
||||||
obj_to_read_selector = registry.getSelector(
|
obj_to_read_selector = registry.getLoopedSelector(
|
||||||
__ENV.REGISTRY_FILE,
|
__ENV.REGISTRY_FILE,
|
||||||
"obj_to_read",
|
"obj_to_read",
|
||||||
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
|
||||||
|
|
Loading…
Reference in a new issue
With this change alone, do we still have duplicates? It seems the culprit
It was introduced in
3c26e7c917
without justification in the commit message (which seemed to be a part of #86, but I cannot see the commit there).With this change alone selector will read registry file only once. This affects read scenarios.
Can we made an explicit opt-in/opt-out for this behaviour? It seems that in each scenario we want a specific semantics (for read -- read many times, for delete -- read once).
To me "read-once" by default seems expected.
But for write + update + delete
read once
doesn't work too.There have been no other comments on this behavior in 8 months. That's why I think it's not necessary to complicate things.
For write + update + delete we use a different selector for each step.
I don't see how it complicates things, other than making them more explicit.
The default is up to you, but each time you create a selector you probably want one specific behaviour, not a random combination of two.
It also doesn't need to be a parameter, maybe 2 different constructors.
Now selector is simple: it just reads objects in a circle. This works fine with the most common scenarios: write+read and read.
I have not received any
read once
-behavior requests from QA. So I don't think it is really required.The bug is exactly about the cycling behaviour: we do not need it if we want to remove each object exactly once.
The scenario is
write+update+delete
- 10 workers put objects, 10 workers update those objects, 10 workers delete updated objects. So withread-once
update and delete workers willhangstop after first iteration.No, because registry groups objects by status.
So we have
update
useread once object with status "created", then put them in registry with status "updated"
delete
willread once object with status "updated", then put then in registry with status "deleted"
.The behaviour was there before the
3c26e7c917
, where a bug was introduced to support read scenarios. A proper solution would've been to do exactly what we need: introduce cycling behaviour for selectors only for read scenarios.Then I don't understand what you mean by
read-once
:(Current implementation always tries to continue iteration first. If there is no new items, it starts from the beginning. This allows to work with update and read scenarios without any additional flags.
done