From 27899598dc2617b859bdcd99d28fc7810329632a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 1 Apr 2025 12:37:35 +0300 Subject: [PATCH] [#1700] gc: Drop Event interface There is only one event: new epoch. Change-Id: I982f3650f7bc753ff2782393625452f0f8cdcc35 Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/shards.go | 4 +- pkg/local_object_storage/shard/control.go | 24 +++-- pkg/local_object_storage/shard/gc.go | 109 +++++++--------------- pkg/local_object_storage/shard/gc_test.go | 4 +- 4 files changed, 47 insertions(+), 94 deletions(-) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index a38c85151..dfc3b1a35 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -318,8 +318,6 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M // HandleNewEpoch notifies every shard about NewEpoch event. func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { - ev := shard.EventNewEpoch(epoch) - e.mtx.RLock() defer e.mtx.RUnlock() @@ -327,7 +325,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { select { case <-ctx.Done(): return - case sh.NotificationChannel() <- ev: + case sh.NotificationChannel() <- epoch: default: e.log.Debug(ctx, logs.ShardEventProcessingInProgress, zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID())) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 6dee2da3f..19b13a8ab 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -108,19 +108,17 @@ func (s *Shard) Init(ctx context.Context) error { s.updateMetrics(ctx) s.gc = &gc{ - gcCfg: &s.gcCfg, - remover: s.removeGarbage, - stopChannel: make(chan struct{}), - eventChan: make(chan Event), - mEventHandler: map[eventType]*eventHandlers{ - eventNewEpoch: { - cancelFunc: func() {}, - handlers: []eventHandler{ - s.collectExpiredLocks, - s.collectExpiredObjects, - s.collectExpiredTombstones, - s.collectExpiredMetrics, - }, + gcCfg: &s.gcCfg, + remover: s.removeGarbage, + stopChannel: make(chan struct{}), + newEpochChan: make(chan uint64), + newEpochHandlers: &newEpochHandlers{ + cancelFunc: func() {}, + handlers: []newEpochHandler{ + s.collectExpiredLocks, + s.collectExpiredObjects, + s.collectExpiredTombstones, + s.collectExpiredMetrics, }, }, } diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 84fb6039e..82e76f1a7 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -33,41 +33,14 @@ type TombstoneSource interface { IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool } -// Event represents class of external events. -type Event interface { - typ() eventType -} +type newEpochHandler func(context.Context, uint64) -type eventType int - -const ( - _ eventType = iota - eventNewEpoch -) - -type newEpoch struct { - epoch uint64 -} - -func (e newEpoch) typ() eventType { - return eventNewEpoch -} - -// EventNewEpoch returns new epoch event. -func EventNewEpoch(e uint64) Event { - return newEpoch{ - epoch: e, - } -} - -type eventHandler func(context.Context, Event) - -type eventHandlers struct { +type newEpochHandlers struct { prevGroup sync.WaitGroup cancelFunc context.CancelFunc - handlers []eventHandler + handlers []newEpochHandler } type gcRunResult struct { @@ -109,10 +82,10 @@ type gc struct { remover func(context.Context) gcRunResult - // eventChan is used only for listening for the new epoch event. + // newEpochChan is used only for listening for the new epoch event. // It is ok to keep opened, we are listening for context done when writing in it. - eventChan chan Event - mEventHandler map[eventType]*eventHandlers + newEpochChan chan uint64 + newEpochHandlers *newEpochHandlers } type gcCfg struct { @@ -142,15 +115,7 @@ func defaultGCCfg() gcCfg { } func (gc *gc) init(ctx context.Context) { - sz := 0 - - for _, v := range gc.mEventHandler { - sz += len(v.handlers) - } - - if sz > 0 { - gc.workerPool = gc.workerPoolInit(sz) - } + gc.workerPool = gc.workerPoolInit(len(gc.newEpochHandlers.handlers)) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) gc.wg.Add(2) go gc.tickRemover(ctx) @@ -168,7 +133,7 @@ func (gc *gc) listenEvents(ctx context.Context) { case <-ctx.Done(): gc.log.Warn(ctx, logs.ShardStopEventListenerByContext) return - case event, ok := <-gc.eventChan: + case event, ok := <-gc.newEpochChan: if !ok { gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel) return @@ -179,38 +144,33 @@ func (gc *gc) listenEvents(ctx context.Context) { } } -func (gc *gc) handleEvent(ctx context.Context, event Event) { - v, ok := gc.mEventHandler[event.typ()] - if !ok { - return - } - - v.cancelFunc() - v.prevGroup.Wait() +func (gc *gc) handleEvent(ctx context.Context, epoch uint64) { + gc.newEpochHandlers.cancelFunc() + gc.newEpochHandlers.prevGroup.Wait() var runCtx context.Context - runCtx, v.cancelFunc = context.WithCancel(ctx) + runCtx, gc.newEpochHandlers.cancelFunc = context.WithCancel(ctx) - v.prevGroup.Add(len(v.handlers)) + gc.newEpochHandlers.prevGroup.Add(len(gc.newEpochHandlers.handlers)) - for i := range v.handlers { + for i := range gc.newEpochHandlers.handlers { select { case <-ctx.Done(): return default: } - h := v.handlers[i] + h := gc.newEpochHandlers.handlers[i] err := gc.workerPool.Submit(func() { - defer v.prevGroup.Done() - h(runCtx, event) + defer gc.newEpochHandlers.prevGroup.Done() + h(runCtx, epoch) }) if err != nil { gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool, zap.Error(err), ) - v.prevGroup.Done() + gc.newEpochHandlers.prevGroup.Done() } } } @@ -362,7 +322,7 @@ func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) { return } -func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { +func (s *Shard) collectExpiredObjects(ctx context.Context, epoch uint64) { var err error startedAt := time.Now() @@ -370,8 +330,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) }() - s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) + s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() @@ -380,7 +340,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) { if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { batch = append(batch, o.Address()) @@ -486,7 +446,7 @@ func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeR return s.metaBase.Inhume(ctx, inhumePrm) } -func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { +func (s *Shard) collectExpiredTombstones(ctx context.Context, epoch uint64) { var err error startedAt := time.Now() @@ -494,7 +454,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) }() - epoch := e.(newEpoch).epoch log := s.log.With(zap.Uint64("epoch", epoch)) log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling) @@ -566,7 +525,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { } } -func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { +func (s *Shard) collectExpiredLocks(ctx context.Context, epoch uint64) { var err error startedAt := time.Now() @@ -574,8 +533,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) }() - s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) + s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() @@ -585,14 +544,14 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) { if o.Type() == objectSDK.TypeLock { batch = append(batch, o.Address()) if len(batch) == batchSize { expired := batch errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) + s.expiredLocksCallback(egCtx, epoch, expired) return egCtx.Err() }) batch = make([]oid.Address, 0, batchSize) @@ -606,7 +565,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { if len(batch) > 0 { expired := batch errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) + s.expiredLocksCallback(egCtx, epoch, expired) return egCtx.Err() }) } @@ -785,17 +744,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) { } } -// NotificationChannel returns channel for shard events. -func (s *Shard) NotificationChannel() chan<- Event { - return s.gc.eventChan +// NotificationChannel returns channel for new epoch events. +func (s *Shard) NotificationChannel() chan<- uint64 { + return s.gc.newEpochChan } -func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { +func (s *Shard) collectExpiredMetrics(ctx context.Context, epoch uint64) { ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics") defer span.End() - epoch := e.(newEpoch).epoch - s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index e3670b441..f512a488a 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -69,7 +69,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { require.NoError(t, err) epoch.Value = 105 - sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) + sh.gc.handleEvent(context.Background(), epoch.Value) var getPrm GetPrm getPrm.SetAddress(objectCore.AddressOf(obj)) @@ -165,7 +165,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { require.True(t, errors.As(err, &splitInfoError), "split info must be provided") epoch.Value = 105 - sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) + sh.gc.handleEvent(context.Background(), epoch.Value) _, err = sh.Get(context.Background(), getPrm) require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")