From 36759f843418c6728897701de99b76feda78ef76 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 6 Sep 2023 18:03:04 +0300 Subject: [PATCH] [#668] shard/test: Properly check event processing See https://git.frostfs.info/TrueCloudLab/frostfs-node/actions/runs/1594/jobs/2 Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/shard/gc.go | 46 +++++++++++--------- pkg/local_object_storage/shard/gc_test.go | 32 ++++++++------ pkg/local_object_storage/shard/shard_test.go | 3 ++ 3 files changed, 47 insertions(+), 34 deletions(-) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 38baccd4..13ab39ae 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -160,33 +160,37 @@ func (gc *gc) listenEvents(ctx context.Context) { return } - v, ok := gc.mEventHandler[event.typ()] - if !ok { - continue - } + gc.handleEvent(ctx, event) + } +} - v.cancelFunc() - v.prevGroup.Wait() +func (gc *gc) handleEvent(ctx context.Context, event Event) { + v, ok := gc.mEventHandler[event.typ()] + if !ok { + return + } - var runCtx context.Context - runCtx, v.cancelFunc = context.WithCancel(ctx) + v.cancelFunc() + v.prevGroup.Wait() - v.prevGroup.Add(len(v.handlers)) + var runCtx context.Context + runCtx, v.cancelFunc = context.WithCancel(ctx) - for i := range v.handlers { - h := v.handlers[i] + v.prevGroup.Add(len(v.handlers)) - err := gc.workerPool.Submit(func() { - defer v.prevGroup.Done() - h(runCtx, event) - }) - if err != nil { - gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, - zap.String("error", err.Error()), - ) + for i := range v.handlers { + h := v.handlers[i] - v.prevGroup.Done() - } + err := gc.workerPool.Submit(func() { + defer v.prevGroup.Done() + h(runCtx, event) + }) + if err != nil { + gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, + zap.String("error", err.Error()), + ) + + v.prevGroup.Done() } } } diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 4f19db87..8b535200 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -4,12 +4,12 @@ import ( "context" "errors" "testing" - "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -25,7 +25,12 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { Value: 100, } - sh := newCustomShard(t, false, shardOptions{metaOptions: []meta.Option{meta.WithEpochState(epoch)}}) + sh := newCustomShard(t, false, shardOptions{ + metaOptions: []meta.Option{meta.WithEpochState(epoch)}, + additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool { + return util.NewPseudoWorkerPool() // synchronous event processing + })}, + }) cnr := cidtest.ID() @@ -60,14 +65,12 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { require.NoError(t, err) epoch.Value = 105 - sh.NotificationChannel() <- EventNewEpoch(epoch.Value) + sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) var getPrm GetPrm getPrm.SetAddress(objectCore.AddressOf(obj)) - require.Eventually(t, func() bool { - _, err = sh.Get(context.Background(), getPrm) - return client.IsErrObjectNotFound(err) - }, 3*time.Second, 1*time.Second, "expired object must be deleted") + _, err = sh.Get(context.Background(), getPrm) + require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted") } func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { @@ -118,7 +121,12 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { linkID, _ := link.ID() - sh := newCustomShard(t, false, shardOptions{metaOptions: []meta.Option{meta.WithEpochState(epoch)}}) + sh := newCustomShard(t, false, shardOptions{ + metaOptions: []meta.Option{meta.WithEpochState(epoch)}, + additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool { + return util.NewPseudoWorkerPool() // synchronous event processing + })}, + }) lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) @@ -152,10 +160,8 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { require.True(t, errors.As(err, &splitInfoError), "split info must be provided") epoch.Value = 105 - sh.NotificationChannel() <- EventNewEpoch(epoch.Value) + sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) - require.Eventually(t, func() bool { - _, err = sh.Get(context.Background(), getPrm) - return client.IsErrObjectNotFound(err) - }, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires") + _, err = sh.Get(context.Background(), getPrm) + require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires") } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index eb2aba58..669ac287 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -36,6 +36,8 @@ type shardOptions struct { wcOpts writecacheconfig.Options bsOpts []blobstor.Option metaOptions []meta.Option + + additionalShardOptions []Option } func newShard(t testing.TB, enableWriteCache bool) *Shard { @@ -114,6 +116,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard }), WithGCRemoverSleepInterval(100 * time.Millisecond), } + opts = append(opts, o.additionalShardOptions...) sh = New(opts...)