From e858479a746604813130aa84edb991d0861b5f62 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Jul 2023 18:23:52 +0300 Subject: [PATCH] [#498] policer: Explicitly Rewind() iterator after finish Previously, we can continue to return `EndOfListing` infinitely. Reflect iterator reuse via Rewind() method. Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/keyspaceiterator.go | 4 ++ pkg/services/policer/option.go | 1 + pkg/services/policer/policer_test.go | 89 +++++++++++++++++++++++++++- pkg/services/policer/process.go | 1 + 4 files changed, 94 insertions(+), 1 deletion(-) diff --git a/cmd/frostfs-node/keyspaceiterator.go b/cmd/frostfs-node/keyspaceiterator.go index 8991964a0..e7214aacb 100644 --- a/cmd/frostfs-node/keyspaceiterator.go +++ b/cmd/frostfs-node/keyspaceiterator.go @@ -26,3 +26,7 @@ func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objec it.cur = res.Cursor() return res.AddressList(), nil } + +func (it *keySpaceIterator) Rewind() { + it.cur = nil +} diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 6f17b2947..e182c6be7 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -23,6 +23,7 @@ import ( // when the end of the key space is reached. type KeySpaceIterator interface { Next(context.Context, uint32) ([]objectcore.AddressWithType, error) + Rewind() } // RedundantCopyCallback is a callback to pass diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 0ead48ef4..ad82d2477 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -244,11 +244,95 @@ func TestProcessObject(t *testing.T) { } } +func TestIteratorContract(t *testing.T) { + addr := oidtest.Address() + objs := []objectcore.AddressWithType{{ + Address: addr, + Type: object.TypeRegular, + }} + + containerSrc := func(id cid.ID) (*container.Container, error) { + return nil, apistatus.ContainerNotFound{} + } + buryFn := func(ctx context.Context, a oid.Address) error { + return nil + } + + pool, err := ants.NewPool(4) + require.NoError(t, err) + + it := &predefinedIterator{ + scenario: []nextResult{ + {objs, nil}, + {nil, errors.New("opaque")}, + {nil, engine.ErrEndOfListing}, + {nil, engine.ErrEndOfListing}, + {nil, errors.New("opaque")}, + {objs, engine.ErrEndOfListing}, + }, + finishCh: make(chan struct{}), + } + + p := New( + WithKeySpaceIterator(it), + WithContainerSource(containerSrcFunc(containerSrc)), + WithBuryFunc(buryFn), + WithPool(pool), + WithNodeLoader(constNodeLoader(0)), + ) + + ctx, cancel := context.WithCancel(context.Background()) + go p.Run(ctx) + + <-it.finishCh + cancel() + require.Equal(t, []string{ + "Next", + "Next", + "Next", + "Rewind", + "Next", + "Rewind", + "Next", + "Next", + "Rewind", + }, it.calls) +} + // TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101) func eqAddr(a, b oid.Address) bool { return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) } +type nextResult struct { + objs []objectcore.AddressWithType + err error +} + +type predefinedIterator struct { + scenario []nextResult + finishCh chan struct{} + pos int + calls []string +} + +func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) { + if it.pos == len(it.scenario) { + close(it.finishCh) + <-ctx.Done() + return nil, nil + } + + res := it.scenario[it.pos] + it.pos += 1 + it.calls = append(it.calls, "Next") + return res.objs, res.err +} + +func (it *predefinedIterator) Rewind() { + it.calls = append(it.calls, "Rewind") +} + // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. type sliceKeySpaceIterator struct { objs []objectcore.AddressWithType @@ -257,7 +341,6 @@ type sliceKeySpaceIterator struct { func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { if it.cur >= len(it.objs) { - it.cur = 0 return nil, engine.ErrEndOfListing } end := it.cur + int(size) @@ -269,6 +352,10 @@ func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectc return ret, nil } +func (it *sliceKeySpaceIterator) Rewind() { + it.cur = 0 +} + // containerSrcFunc is a container.Source backed by a function. type containerSrcFunc func(cid.ID) (*container.Container, error) diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 39b61c8a0..cdc92ed12 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -27,6 +27,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { + p.keySpaceIterator.Rewind() time.Sleep(time.Second) // finished whole cycle, sleep a bit continue }