[#1445] shard/gc: Remove graves of new format separately
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m53s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m13s
Tests and linters / Staticcheck (pull_request) Successful in 4m14s
Tests and linters / Lint (pull_request) Successful in 5m9s
Tests and linters / Run gofumpt (pull_request) Successful in 9m14s
DCO action / DCO (pull_request) Successful in 9m33s
Build / Build Components (pull_request) Successful in 10m22s
Tests and linters / Tests (pull_request) Successful in 10m42s
Tests and linters / Tests with -race (pull_request) Successful in 11m4s
Tests and linters / gopls check (pull_request) Successful in 11m36s

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-12-20 11:17:13 +03:00
parent 0c72a8de1c
commit 0092a49f8f
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
3 changed files with 77 additions and 8 deletions

View file

@ -255,6 +255,7 @@ const (
ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects"
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects"
ShardFailedToRemoveExpiredGraves = "failed to remove expired graves"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"

View file

@ -317,3 +317,42 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
})
return res, err
}
// RemoveGraves removes graves from graveyard.
func (db *DB) RemoveGraves(ctx context.Context, graves []TombstonedObject) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("RemoveGraves", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveGraves")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
} else if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
key := make([]byte, addressKeySize)
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
graveyard := tx.Bucket(graveyardBucketName)
if graveyard == nil {
return errors.New("no graveyard bucket")
}
for i := range graves {
if err := graveyard.Delete(addressKey(graves[i].Address(), key)); err != nil {
return fmt.Errorf("remove grave: %w", err)
}
}
return nil
})
return metaerr.Wrap(err)
}

View file

@ -534,9 +534,12 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
defer log.Debug(ctx, logs.ShardFinishedExpiredTombstonesHandling)
// TODO(@a-savchuk): Magic number. Should it be configurable? Same way
// it's done for handling expired objects: batch size and worker count.
const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
expiredGravesWithExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
expiredGravesWithoutExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
@ -577,19 +580,45 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
}
for _, ts := range tss {
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
tssExp = append(tssExp, ts)
if ts.ExpirationEpoch() != meta.NoExpirationEpoch {
expiredGravesWithExpEpoch = append(expiredGravesWithExpEpoch, ts)
} else if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
expiredGravesWithoutExpEpoch = append(expiredGravesWithoutExpEpoch, ts)
}
}
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
if len(tssExp) > 0 {
s.expiredGravesCallback(ctx, tssExp)
}
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch,
zap.Int("with expiration epoch", len(expiredGravesWithExpEpoch)),
zap.Int("without expiration epoch", len(expiredGravesWithoutExpEpoch)),
)
s.handleExpiredGraves(ctx, expiredGravesWithExpEpoch, expiredGravesWithoutExpEpoch)
iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0]
tssExp = tssExp[:0]
expiredGravesWithExpEpoch = expiredGravesWithExpEpoch[:0]
expiredGravesWithoutExpEpoch = expiredGravesWithoutExpEpoch[:0]
}
}
func (s *Shard) handleExpiredGraves(ctx context.Context,
expiredWithExpEpoch []meta.TombstonedObject, expiredWithoutExpEpoch []meta.TombstonedObject,
) {
if len(expiredWithExpEpoch) > 0 {
s.removeExpiredGraves(ctx, expiredWithExpEpoch)
}
if len(expiredWithoutExpEpoch) > 0 {
s.expiredGravesCallback(ctx, expiredWithoutExpEpoch)
}
}
func (s *Shard) removeExpiredGraves(ctx context.Context, expired []meta.TombstonedObject) {
if s.info.Mode.NoMetabase() {
return
}
if err := s.metaBase.RemoveGraves(ctx, expired); err != nil {
s.log.Warn(ctx, logs.ShardFailedToRemoveExpiredGraves, zap.Error(err))
}
}