[#1700] gc: Drop Event interface

There is only one event: new epoch.

Change-Id: I982f3650f7bc753ff2782393625452f0f8cdcc35
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-04-01 12:37:35 +03:00
parent bc6cc9ae2a
commit 27899598dc
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
4 changed files with 47 additions and 94 deletions

View file

@ -318,8 +318,6 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M
// HandleNewEpoch notifies every shard about NewEpoch event. // HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch)
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -327,7 +325,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case sh.NotificationChannel() <- ev: case sh.NotificationChannel() <- epoch:
default: default:
e.log.Debug(ctx, logs.ShardEventProcessingInProgress, e.log.Debug(ctx, logs.ShardEventProcessingInProgress,
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID())) zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))

View file

@ -108,19 +108,17 @@ func (s *Shard) Init(ctx context.Context) error {
s.updateMetrics(ctx) s.updateMetrics(ctx)
s.gc = &gc{ s.gc = &gc{
gcCfg: &s.gcCfg, gcCfg: &s.gcCfg,
remover: s.removeGarbage, remover: s.removeGarbage,
stopChannel: make(chan struct{}), stopChannel: make(chan struct{}),
eventChan: make(chan Event), newEpochChan: make(chan uint64),
mEventHandler: map[eventType]*eventHandlers{ newEpochHandlers: &newEpochHandlers{
eventNewEpoch: { cancelFunc: func() {},
cancelFunc: func() {}, handlers: []newEpochHandler{
handlers: []eventHandler{ s.collectExpiredLocks,
s.collectExpiredLocks, s.collectExpiredObjects,
s.collectExpiredObjects, s.collectExpiredTombstones,
s.collectExpiredTombstones, s.collectExpiredMetrics,
s.collectExpiredMetrics,
},
}, },
}, },
} }

View file

@ -33,41 +33,14 @@ type TombstoneSource interface {
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
} }
// Event represents class of external events. type newEpochHandler func(context.Context, uint64)
type Event interface {
typ() eventType
}
type eventType int type newEpochHandlers struct {
const (
_ eventType = iota
eventNewEpoch
)
type newEpoch struct {
epoch uint64
}
func (e newEpoch) typ() eventType {
return eventNewEpoch
}
// EventNewEpoch returns new epoch event.
func EventNewEpoch(e uint64) Event {
return newEpoch{
epoch: e,
}
}
type eventHandler func(context.Context, Event)
type eventHandlers struct {
prevGroup sync.WaitGroup prevGroup sync.WaitGroup
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
handlers []eventHandler handlers []newEpochHandler
} }
type gcRunResult struct { type gcRunResult struct {
@ -109,10 +82,10 @@ type gc struct {
remover func(context.Context) gcRunResult remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event. // newEpochChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it. // It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event newEpochChan chan uint64
mEventHandler map[eventType]*eventHandlers newEpochHandlers *newEpochHandlers
} }
type gcCfg struct { type gcCfg struct {
@ -142,15 +115,7 @@ func defaultGCCfg() gcCfg {
} }
func (gc *gc) init(ctx context.Context) { func (gc *gc) init(ctx context.Context) {
sz := 0 gc.workerPool = gc.workerPoolInit(len(gc.newEpochHandlers.handlers))
for _, v := range gc.mEventHandler {
sz += len(v.handlers)
}
if sz > 0 {
gc.workerPool = gc.workerPoolInit(sz)
}
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
gc.wg.Add(2) gc.wg.Add(2)
go gc.tickRemover(ctx) go gc.tickRemover(ctx)
@ -168,7 +133,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
gc.log.Warn(ctx, logs.ShardStopEventListenerByContext) gc.log.Warn(ctx, logs.ShardStopEventListenerByContext)
return return
case event, ok := <-gc.eventChan: case event, ok := <-gc.newEpochChan:
if !ok { if !ok {
gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel) gc.log.Warn(ctx, logs.ShardStopEventListenerByClosedEventChannel)
return return
@ -179,38 +144,33 @@ func (gc *gc) listenEvents(ctx context.Context) {
} }
} }
func (gc *gc) handleEvent(ctx context.Context, event Event) { func (gc *gc) handleEvent(ctx context.Context, epoch uint64) {
v, ok := gc.mEventHandler[event.typ()] gc.newEpochHandlers.cancelFunc()
if !ok { gc.newEpochHandlers.prevGroup.Wait()
return
}
v.cancelFunc()
v.prevGroup.Wait()
var runCtx context.Context var runCtx context.Context
runCtx, v.cancelFunc = context.WithCancel(ctx) runCtx, gc.newEpochHandlers.cancelFunc = context.WithCancel(ctx)
v.prevGroup.Add(len(v.handlers)) gc.newEpochHandlers.prevGroup.Add(len(gc.newEpochHandlers.handlers))
for i := range v.handlers { for i := range gc.newEpochHandlers.handlers {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
} }
h := v.handlers[i] h := gc.newEpochHandlers.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done() defer gc.newEpochHandlers.prevGroup.Done()
h(runCtx, event) h(runCtx, epoch)
}) })
if err != nil { if err != nil {
gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool, gc.log.Warn(ctx, logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.Error(err), zap.Error(err),
) )
v.prevGroup.Done() gc.newEpochHandlers.prevGroup.Done()
} }
} }
} }
@ -362,7 +322,7 @@ func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) {
return return
} }
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { func (s *Shard) collectExpiredObjects(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -370,8 +330,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
}() }()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
@ -380,7 +340,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
@ -486,7 +446,7 @@ func (s *Shard) inhumeGC(ctx context.Context, addrs []oid.Address) (meta.InhumeR
return s.metaBase.Inhume(ctx, inhumePrm) return s.metaBase.Inhume(ctx, inhumePrm)
} }
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -494,7 +454,6 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
}() }()
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling) log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
@ -566,7 +525,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
} }
} }
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { func (s *Shard) collectExpiredLocks(ctx context.Context, epoch uint64) {
var err error var err error
startedAt := time.Now() startedAt := time.Now()
@ -574,8 +533,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
}() }()
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", epoch))
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
@ -585,14 +544,14 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
if o.Type() == objectSDK.TypeLock { if o.Type() == objectSDK.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
if len(batch) == batchSize { if len(batch) == batchSize {
expired := batch expired := batch
errGroup.Go(func() error { errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err() return egCtx.Err()
}) })
batch = make([]oid.Address, 0, batchSize) batch = make([]oid.Address, 0, batchSize)
@ -606,7 +565,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
if len(batch) > 0 { if len(batch) > 0 {
expired := batch expired := batch
errGroup.Go(func() error { errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) s.expiredLocksCallback(egCtx, epoch, expired)
return egCtx.Err() return egCtx.Err()
}) })
} }
@ -785,17 +744,15 @@ func (s *Shard) HandleDeletedLocks(ctx context.Context, lockers []oid.Address) {
} }
} }
// NotificationChannel returns channel for shard events. // NotificationChannel returns channel for new epoch events.
func (s *Shard) NotificationChannel() chan<- Event { func (s *Shard) NotificationChannel() chan<- uint64 {
return s.gc.eventChan return s.gc.newEpochChan
} }
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { func (s *Shard) collectExpiredMetrics(ctx context.Context, epoch uint64) {
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics") ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
defer span.End() defer span.End()
epoch := e.(newEpoch).epoch
s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch)) s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))

View file

@ -69,7 +69,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
epoch.Value = 105 epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) sh.gc.handleEvent(context.Background(), epoch.Value)
var getPrm GetPrm var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj)) getPrm.SetAddress(objectCore.AddressOf(obj))
@ -165,7 +165,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
require.True(t, errors.As(err, &splitInfoError), "split info must be provided") require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105 epoch.Value = 105
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) sh.gc.handleEvent(context.Background(), epoch.Value)
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires") require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")