[#498] policer: Explicitly Rewind() iterator after finish
Previously, we can continue to return `EndOfListing` infinitely. Reflect iterator reuse via Rewind() method. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
6eefe9747e
commit
5d836b9560
4 changed files with 94 additions and 1 deletions
|
@ -26,3 +26,7 @@ func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objec
|
|||
it.cur = res.Cursor()
|
||||
return res.AddressList(), nil
|
||||
}
|
||||
|
||||
func (it *keySpaceIterator) Rewind() {
|
||||
it.cur = nil
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
// when the end of the key space is reached.
|
||||
type KeySpaceIterator interface {
|
||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||||
Rewind()
|
||||
}
|
||||
|
||||
// RedundantCopyCallback is a callback to pass
|
||||
|
|
|
@ -244,11 +244,95 @@ func TestProcessObject(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIteratorContract(t *testing.T) {
|
||||
addr := oidtest.Address()
|
||||
objs := []objectcore.AddressWithType{{
|
||||
Address: addr,
|
||||
Type: object.TypeRegular,
|
||||
}}
|
||||
|
||||
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||
return nil, apistatus.ContainerNotFound{}
|
||||
}
|
||||
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(4)
|
||||
require.NoError(t, err)
|
||||
|
||||
it := &predefinedIterator{
|
||||
scenario: []nextResult{
|
||||
{objs, nil},
|
||||
{nil, errors.New("opaque")},
|
||||
{nil, engine.ErrEndOfListing},
|
||||
{nil, engine.ErrEndOfListing},
|
||||
{nil, errors.New("opaque")},
|
||||
{objs, engine.ErrEndOfListing},
|
||||
},
|
||||
finishCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
p := New(
|
||||
WithKeySpaceIterator(it),
|
||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||
WithBuryFunc(buryFn),
|
||||
WithPool(pool),
|
||||
WithNodeLoader(constNodeLoader(0)),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go p.Run(ctx)
|
||||
|
||||
<-it.finishCh
|
||||
cancel()
|
||||
require.Equal(t, []string{
|
||||
"Next",
|
||||
"Next",
|
||||
"Next",
|
||||
"Rewind",
|
||||
"Next",
|
||||
"Rewind",
|
||||
"Next",
|
||||
"Next",
|
||||
"Rewind",
|
||||
}, it.calls)
|
||||
}
|
||||
|
||||
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
||||
func eqAddr(a, b oid.Address) bool {
|
||||
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||
}
|
||||
|
||||
type nextResult struct {
|
||||
objs []objectcore.AddressWithType
|
||||
err error
|
||||
}
|
||||
|
||||
type predefinedIterator struct {
|
||||
scenario []nextResult
|
||||
finishCh chan struct{}
|
||||
pos int
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||
if it.pos == len(it.scenario) {
|
||||
close(it.finishCh)
|
||||
<-ctx.Done()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
res := it.scenario[it.pos]
|
||||
it.pos += 1
|
||||
it.calls = append(it.calls, "Next")
|
||||
return res.objs, res.err
|
||||
}
|
||||
|
||||
func (it *predefinedIterator) Rewind() {
|
||||
it.calls = append(it.calls, "Rewind")
|
||||
}
|
||||
|
||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||
type sliceKeySpaceIterator struct {
|
||||
objs []objectcore.AddressWithType
|
||||
|
@ -257,7 +341,6 @@ type sliceKeySpaceIterator struct {
|
|||
|
||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||
if it.cur >= len(it.objs) {
|
||||
it.cur = 0
|
||||
return nil, engine.ErrEndOfListing
|
||||
}
|
||||
end := it.cur + int(size)
|
||||
|
@ -269,6 +352,10 @@ func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectc
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (it *sliceKeySpaceIterator) Rewind() {
|
||||
it.cur = 0
|
||||
}
|
||||
|
||||
// containerSrcFunc is a container.Source backed by a function.
|
||||
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
|
||||
if err != nil {
|
||||
if errors.Is(err, engine.ErrEndOfListing) {
|
||||
p.keySpaceIterator.Rewind()
|
||||
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue