[#1445] shard/gc: Remove graves of new format separately
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 3m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m42s
Tests and linters / gopls check (pull_request) Successful in 5m18s
Tests and linters / Tests with -race (pull_request) Successful in 8m45s
Tests and linters / Run gofumpt (pull_request) Successful in 11m6s
DCO action / DCO (pull_request) Successful in 11m24s
Tests and linters / Staticcheck (pull_request) Successful in 14m33s
Tests and linters / Tests (pull_request) Successful in 8m33s
Build / Build Components (pull_request) Successful in 15m6s
Tests and linters / Lint (pull_request) Successful in 15m58s
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 3m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m42s
Tests and linters / gopls check (pull_request) Successful in 5m18s
Tests and linters / Tests with -race (pull_request) Successful in 8m45s
Tests and linters / Run gofumpt (pull_request) Successful in 11m6s
DCO action / DCO (pull_request) Successful in 11m24s
Tests and linters / Staticcheck (pull_request) Successful in 14m33s
Tests and linters / Tests (pull_request) Successful in 8m33s
Build / Build Components (pull_request) Successful in 15m6s
Tests and linters / Lint (pull_request) Successful in 15m58s
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
5c0d9c1c08
commit
de1ef66a21
3 changed files with 77 additions and 8 deletions
|
@ -255,6 +255,7 @@ const (
|
|||
ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects"
|
||||
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
|
||||
ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects"
|
||||
ShardFailedToRemoveExpiredGraves = "failed to remove expired graves"
|
||||
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
|
||||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
|
||||
|
|
|
@ -317,3 +317,42 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
|
|||
})
|
||||
return res, err
|
||||
}
|
||||
|
||||
// RemoveGraves removes graves from graveyard.
|
||||
func (db *DB) RemoveGraves(ctx context.Context, graves []TombstonedObject) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("RemoveGraves", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.RemoveGraves")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
key := make([]byte, addressKeySize)
|
||||
|
||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
graveyard := tx.Bucket(graveyardBucketName)
|
||||
if graveyard == nil {
|
||||
return errors.New("no graveyard bucket")
|
||||
}
|
||||
for i := range graves {
|
||||
if err := graveyard.Delete(addressKey(graves[i].Address(), key)); err != nil {
|
||||
return fmt.Errorf("remove grave: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
|
|
@ -534,9 +534,12 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
|
|||
log.Debug(ctx, logs.ShardStartedExpiredTombstonesHandling)
|
||||
defer log.Debug(ctx, logs.ShardFinishedExpiredTombstonesHandling)
|
||||
|
||||
// TODO(@a-savchuk): Magic number. Should it be configurable? Same way
|
||||
// it's done for handling expired objects: batch size and worker count.
|
||||
const tssDeleteBatch = 50
|
||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
expiredGravesWithExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
expiredGravesWithoutExpEpoch := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
|
||||
var iterPrm meta.GraveyardIterationPrm
|
||||
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
|
||||
|
@ -577,19 +580,45 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
|
|||
}
|
||||
|
||||
for _, ts := range tss {
|
||||
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||
tssExp = append(tssExp, ts)
|
||||
if ts.ExpirationEpoch() != meta.NoExpirationEpoch {
|
||||
expiredGravesWithExpEpoch = append(expiredGravesWithExpEpoch, ts)
|
||||
} else if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||
expiredGravesWithoutExpEpoch = append(expiredGravesWithoutExpEpoch, ts)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp)))
|
||||
if len(tssExp) > 0 {
|
||||
s.expiredGravesCallback(ctx, tssExp)
|
||||
}
|
||||
log.Debug(ctx, logs.ShardHandlingExpiredTombstonesBatch,
|
||||
zap.Int("with expiration epoch", len(expiredGravesWithExpEpoch)),
|
||||
zap.Int("without expiration epoch", len(expiredGravesWithoutExpEpoch)),
|
||||
)
|
||||
|
||||
s.handleExpiredGraves(ctx, expiredGravesWithExpEpoch, expiredGravesWithoutExpEpoch)
|
||||
|
||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||
tss = tss[:0]
|
||||
tssExp = tssExp[:0]
|
||||
expiredGravesWithExpEpoch = expiredGravesWithExpEpoch[:0]
|
||||
expiredGravesWithoutExpEpoch = expiredGravesWithoutExpEpoch[:0]
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) handleExpiredGraves(ctx context.Context,
|
||||
expiredWithExpEpoch []meta.TombstonedObject, expiredWithoutExpEpoch []meta.TombstonedObject,
|
||||
) {
|
||||
if len(expiredWithExpEpoch) > 0 {
|
||||
s.removeExpiredGraves(ctx, expiredWithExpEpoch)
|
||||
}
|
||||
if len(expiredWithoutExpEpoch) > 0 {
|
||||
s.expiredGravesCallback(ctx, expiredWithoutExpEpoch)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) removeExpiredGraves(ctx context.Context, expired []meta.TombstonedObject) {
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.metaBase.RemoveGraves(ctx, expired); err != nil {
|
||||
s.log.Warn(ctx, logs.ShardFailedToRemoveExpiredGraves, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue