From 5d836b9560ffb3ed9c13198749aeed8a6340718d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Jul 2023 18:23:52 +0300 Subject: [PATCH 1/2] [#498] policer: Explicitly Rewind() iterator after finish Previously, we can continue to return `EndOfListing` infinitely. Reflect iterator reuse via Rewind() method. Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-node/keyspaceiterator.go | 4 ++ pkg/services/policer/option.go | 1 + pkg/services/policer/policer_test.go | 89 +++++++++++++++++++++++++++- pkg/services/policer/process.go | 1 + 4 files changed, 94 insertions(+), 1 deletion(-) diff --git a/cmd/frostfs-node/keyspaceiterator.go b/cmd/frostfs-node/keyspaceiterator.go index 8991964a0..e7214aacb 100644 --- a/cmd/frostfs-node/keyspaceiterator.go +++ b/cmd/frostfs-node/keyspaceiterator.go @@ -26,3 +26,7 @@ func (it *keySpaceIterator) Next(ctx context.Context, batchSize uint32) ([]objec it.cur = res.Cursor() return res.AddressList(), nil } + +func (it *keySpaceIterator) Rewind() { + it.cur = nil +} diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index 6f17b2947..e182c6be7 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -23,6 +23,7 @@ import ( // when the end of the key space is reached. type KeySpaceIterator interface { Next(context.Context, uint32) ([]objectcore.AddressWithType, error) + Rewind() } // RedundantCopyCallback is a callback to pass diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index a09957895..86ba0e1ad 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -244,11 +244,95 @@ func TestProcessObject(t *testing.T) { } } +func TestIteratorContract(t *testing.T) { + addr := oidtest.Address() + objs := []objectcore.AddressWithType{{ + Address: addr, + Type: object.TypeRegular, + }} + + containerSrc := func(id cid.ID) (*container.Container, error) { + return nil, apistatus.ContainerNotFound{} + } + buryFn := func(ctx context.Context, a oid.Address) error { + return nil + } + + pool, err := ants.NewPool(4) + require.NoError(t, err) + + it := &predefinedIterator{ + scenario: []nextResult{ + {objs, nil}, + {nil, errors.New("opaque")}, + {nil, engine.ErrEndOfListing}, + {nil, engine.ErrEndOfListing}, + {nil, errors.New("opaque")}, + {objs, engine.ErrEndOfListing}, + }, + finishCh: make(chan struct{}), + } + + p := New( + WithKeySpaceIterator(it), + WithContainerSource(containerSrcFunc(containerSrc)), + WithBuryFunc(buryFn), + WithPool(pool), + WithNodeLoader(constNodeLoader(0)), + ) + + ctx, cancel := context.WithCancel(context.Background()) + go p.Run(ctx) + + <-it.finishCh + cancel() + require.Equal(t, []string{ + "Next", + "Next", + "Next", + "Rewind", + "Next", + "Rewind", + "Next", + "Next", + "Rewind", + }, it.calls) +} + // TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101) func eqAddr(a, b oid.Address) bool { return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object()) } +type nextResult struct { + objs []objectcore.AddressWithType + err error +} + +type predefinedIterator struct { + scenario []nextResult + finishCh chan struct{} + pos int + calls []string +} + +func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) { + if it.pos == len(it.scenario) { + close(it.finishCh) + <-ctx.Done() + return nil, nil + } + + res := it.scenario[it.pos] + it.pos += 1 + it.calls = append(it.calls, "Next") + return res.objs, res.err +} + +func (it *predefinedIterator) Rewind() { + it.calls = append(it.calls, "Rewind") +} + // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. type sliceKeySpaceIterator struct { objs []objectcore.AddressWithType @@ -257,7 +341,6 @@ type sliceKeySpaceIterator struct { func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { if it.cur >= len(it.objs) { - it.cur = 0 return nil, engine.ErrEndOfListing } end := it.cur + int(size) @@ -269,6 +352,10 @@ func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectc return ret, nil } +func (it *sliceKeySpaceIterator) Rewind() { + it.cur = 0 +} + // containerSrcFunc is a container.Source backed by a function. type containerSrcFunc func(cid.ID) (*container.Container, error) diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 39b61c8a0..cdc92ed12 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -27,6 +27,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { + p.keySpaceIterator.Rewind() time.Sleep(time.Second) // finished whole cycle, sleep a bit continue } -- 2.45.3 From 80ac7a10d7110193f963b3c156e8c45f5dbec0d3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Jul 2023 18:29:16 +0300 Subject: [PATCH 2/2] [#498] policer: Allow to set sleep duration between iterations Speed up tests on CI. Signed-off-by: Evgenii Stratonikov --- pkg/services/policer/option.go | 3 ++- pkg/services/policer/policer_test.go | 4 ++++ pkg/services/policer/process.go | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/services/policer/option.go b/pkg/services/policer/option.go index e182c6be7..4194353ca 100644 --- a/pkg/services/policer/option.go +++ b/pkg/services/policer/option.go @@ -76,7 +76,7 @@ type cfg struct { batchSize, cacheSize uint32 - rebalanceFreq, evictDuration time.Duration + rebalanceFreq, evictDuration, sleepDuration time.Duration } func defaultCfg() *cfg { @@ -85,6 +85,7 @@ func defaultCfg() *cfg { batchSize: 10, cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB rebalanceFreq: 1 * time.Second, + sleepDuration: 1 * time.Second, evictDuration: 30 * time.Second, } } diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 86ba0e1ad..2d6841b35 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -6,6 +6,7 @@ import ( "errors" "sort" "testing" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -279,6 +280,9 @@ func TestIteratorContract(t *testing.T) { WithBuryFunc(buryFn), WithPool(pool), WithNodeLoader(constNodeLoader(0)), + func(c *cfg) { + c.sleepDuration = time.Millisecond + }, ) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index cdc92ed12..3b54bf929 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -28,7 +28,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { if err != nil { if errors.Is(err, engine.ErrEndOfListing) { p.keySpaceIterator.Rewind() - time.Sleep(time.Second) // finished whole cycle, sleep a bit + time.Sleep(p.sleepDuration) // finished whole cycle, sleep a bit continue } p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err)) -- 2.45.3