[#1445] shard/gc: Remove graves
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
6bc49eb741
commit
42681223d0
1 changed files with 30 additions and 8 deletions
|
@ -534,9 +534,12 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
|
|||
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
|
||||
defer log.Debug(ctx, logs.ShardFinishedExpiredTombstonesHandling)
|
||||
|
||||
// TODO(@a-savchuk): Magic number. Should it be configurable? Same way
|
||||
// it's done for handling expired objects: batch size and worker count.
|
||||
const tssDeleteBatch = 50
|
||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
expiredGravesWithExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
expiredGravesWithoutExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
|
||||
var iterPrm meta.GraveyardIterationPrm
|
||||
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
|
||||
|
@ -577,22 +580,41 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
|
|||
}
|
||||
|
||||
for _, ts := range tss {
|
||||
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||
tssExp = append(tssExp, ts)
|
||||
if ts.ExpirationEpoch() != meta.NoExpirationEpoch {
|
||||
expiredGravesWithExpEpoch = append(expiredGravesWithExpEpoch, ts)
|
||||
} else if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||
expiredGravesWithoutExpEpoch = append(expiredGravesWithoutExpEpoch, ts)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
||||
if len(tssExp) > 0 {
|
||||
s.expiredGravesCallback(ctx, tssExp)
|
||||
}
|
||||
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch,
|
||||
zap.Int("with exp. epoch", len(expiredGravesWithExpEpoch)),
|
||||
zap.Int("without exp. epoch", len(expiredGravesWithoutExpEpoch)),
|
||||
)
|
||||
|
||||
s.handleExpiredGraves(ctx, expiredGravesWithExpEpoch, expiredGravesWithoutExpEpoch)
|
||||
|
||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||
tss = tss[:0]
|
||||
tssExp = tssExp[:0]
|
||||
expiredGravesWithExpEpoch = expiredGravesWithExpEpoch[:0]
|
||||
expiredGravesWithoutExpEpoch = expiredGravesWithoutExpEpoch[:0]
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) handleExpiredGraves(ctx context.Context,
|
||||
expiredWithExpEpoch []meta.TombstonedObject, expiredWithoutExpEpoch []meta.TombstonedObject,
|
||||
) {
|
||||
if len(expiredWithExpEpoch) > 0 {
|
||||
s.removeGraves(ctx, expiredWithExpEpoch)
|
||||
}
|
||||
if len(expiredWithoutExpEpoch) > 0 {
|
||||
s.expiredGravesCallback(ctx, expiredWithoutExpEpoch)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) removeGraves(_ context.Context, _ []meta.TombstonedObject) {
|
||||
}
|
||||
|
||||
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
|
Loading…
Reference in a new issue