forked from TrueCloudLab/frostfs-node
[#498] policer: Explicitly Rewind() iterator after finish
Previously, we can continue to return `EndOfListing` infinitely. Reflect iterator reuse via Rewind() method. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
a0d51090a4
commit
e858479a74
4 changed files with 94 additions and 1 deletions
|
@ -26,3 +26,7 @@ func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objec
|
||||||
it.cur = res.Cursor()
|
it.cur = res.Cursor()
|
||||||
return res.AddressList(), nil
|
return res.AddressList(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *keySpaceIterator) Rewind() {
|
||||||
|
it.cur = nil
|
||||||
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
// when the end of the key space is reached.
|
// when the end of the key space is reached.
|
||||||
type KeySpaceIterator interface {
|
type KeySpaceIterator interface {
|
||||||
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
||||||
|
Rewind()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RedundantCopyCallback is a callback to pass
|
// RedundantCopyCallback is a callback to pass
|
||||||
|
|
|
@ -244,11 +244,95 @@ func TestProcessObject(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIteratorContract(t *testing.T) {
|
||||||
|
addr := oidtest.Address()
|
||||||
|
objs := []objectcore.AddressWithType{{
|
||||||
|
Address: addr,
|
||||||
|
Type: object.TypeRegular,
|
||||||
|
}}
|
||||||
|
|
||||||
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
||||||
|
return nil, apistatus.ContainerNotFound{}
|
||||||
|
}
|
||||||
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pool, err := ants.NewPool(4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
it := &predefinedIterator{
|
||||||
|
scenario: []nextResult{
|
||||||
|
{objs, nil},
|
||||||
|
{nil, errors.New("opaque")},
|
||||||
|
{nil, engine.ErrEndOfListing},
|
||||||
|
{nil, engine.ErrEndOfListing},
|
||||||
|
{nil, errors.New("opaque")},
|
||||||
|
{objs, engine.ErrEndOfListing},
|
||||||
|
},
|
||||||
|
finishCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
p := New(
|
||||||
|
WithKeySpaceIterator(it),
|
||||||
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
|
WithBuryFunc(buryFn),
|
||||||
|
WithPool(pool),
|
||||||
|
WithNodeLoader(constNodeLoader(0)),
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go p.Run(ctx)
|
||||||
|
|
||||||
|
<-it.finishCh
|
||||||
|
cancel()
|
||||||
|
require.Equal(t, []string{
|
||||||
|
"Next",
|
||||||
|
"Next",
|
||||||
|
"Next",
|
||||||
|
"Rewind",
|
||||||
|
"Next",
|
||||||
|
"Rewind",
|
||||||
|
"Next",
|
||||||
|
"Next",
|
||||||
|
"Rewind",
|
||||||
|
}, it.calls)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
||||||
func eqAddr(a, b oid.Address) bool {
|
func eqAddr(a, b oid.Address) bool {
|
||||||
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nextResult struct {
|
||||||
|
objs []objectcore.AddressWithType
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type predefinedIterator struct {
|
||||||
|
scenario []nextResult
|
||||||
|
finishCh chan struct{}
|
||||||
|
pos int
|
||||||
|
calls []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||||
|
if it.pos == len(it.scenario) {
|
||||||
|
close(it.finishCh)
|
||||||
|
<-ctx.Done()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
res := it.scenario[it.pos]
|
||||||
|
it.pos += 1
|
||||||
|
it.calls = append(it.calls, "Next")
|
||||||
|
return res.objs, res.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *predefinedIterator) Rewind() {
|
||||||
|
it.calls = append(it.calls, "Rewind")
|
||||||
|
}
|
||||||
|
|
||||||
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
||||||
type sliceKeySpaceIterator struct {
|
type sliceKeySpaceIterator struct {
|
||||||
objs []objectcore.AddressWithType
|
objs []objectcore.AddressWithType
|
||||||
|
@ -257,7 +341,6 @@ type sliceKeySpaceIterator struct {
|
||||||
|
|
||||||
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
||||||
if it.cur >= len(it.objs) {
|
if it.cur >= len(it.objs) {
|
||||||
it.cur = 0
|
|
||||||
return nil, engine.ErrEndOfListing
|
return nil, engine.ErrEndOfListing
|
||||||
}
|
}
|
||||||
end := it.cur + int(size)
|
end := it.cur + int(size)
|
||||||
|
@ -269,6 +352,10 @@ func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectc
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *sliceKeySpaceIterator) Rewind() {
|
||||||
|
it.cur = 0
|
||||||
|
}
|
||||||
|
|
||||||
// containerSrcFunc is a container.Source backed by a function.
|
// containerSrcFunc is a container.Source backed by a function.
|
||||||
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
|
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, engine.ErrEndOfListing) {
|
if errors.Is(err, engine.ErrEndOfListing) {
|
||||||
|
p.keySpaceIterator.Rewind()
|
||||||
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
time.Sleep(time.Second) // finished whole cycle, sleep a bit
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue