[#1445] shard/gc: Collect objects of all types
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
8d6d829348
commit
6bc49eb741
3 changed files with 80 additions and 75 deletions
|
@ -254,6 +254,7 @@ const (
|
|||
ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage"
|
||||
ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects"
|
||||
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
|
||||
ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects"
|
||||
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
|
||||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
|
||||
|
|
|
@ -116,7 +116,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
eventNewEpoch: {
|
||||
cancelFunc: func() {},
|
||||
handlers: []eventHandler{
|
||||
s.collectExpiredLockObjects,
|
||||
s.collectExpiredObjects,
|
||||
s.collectExpiredGraves,
|
||||
s.collectExpiredMetrics,
|
||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
|
@ -356,40 +358,61 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
|||
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
||||
epoch := e.(newEpoch).epoch
|
||||
|
||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch))
|
||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch))
|
||||
|
||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
batch := make([]oid.Address, 0, batchSize)
|
||||
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
||||
if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock {
|
||||
batch = append(batch, o.Address())
|
||||
handlers := map[objectSDK.Type]func(context.Context, []oid.Address){
|
||||
objectSDK.TypeRegular: s.handleExpiredObjects,
|
||||
objectSDK.TypeTombstone: s.handleExpiredTombstones,
|
||||
objectSDK.TypeLock: func(ctx context.Context, batch []oid.Address) {
|
||||
s.expiredLockObjectsCallback(ctx, epoch, batch)
|
||||
},
|
||||
}
|
||||
knownTypes := maps.Keys(handlers)
|
||||
|
||||
if len(batch) == batchSize {
|
||||
expired := batch
|
||||
errGroup.Go(func() error {
|
||||
s.handleExpiredObjects(egCtx, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
batch = make([]oid.Address, 0, batchSize)
|
||||
}
|
||||
batches := make(map[objectSDK.Type][]oid.Address)
|
||||
for _, typ := range knownTypes {
|
||||
batches[typ] = make([]oid.Address, 0, batchSize)
|
||||
}
|
||||
|
||||
errGroup.Go(func() error {
|
||||
expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) {
|
||||
typ := o.Type()
|
||||
|
||||
if !slices.Contains(knownTypes, typ) {
|
||||
s.log.Warn(ctx, logs.ShardUnknownObjectTypeWhileIteratingExpiredObjects, zap.Stringer("type", typ))
|
||||
}
|
||||
|
||||
batches[typ] = append(batches[typ], o.Address())
|
||||
|
||||
if len(batches[typ]) == batchSize {
|
||||
expired := batches[typ]
|
||||
errGroup.Go(func() error {
|
||||
handlers[typ](egCtx, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
batches[typ] = make([]oid.Address, 0, batchSize)
|
||||
}
|
||||
})
|
||||
if expErr != nil {
|
||||
return expErr
|
||||
}
|
||||
|
||||
if len(batch) > 0 {
|
||||
expired := batch
|
||||
errGroup.Go(func() error {
|
||||
s.handleExpiredObjects(egCtx, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
for typ, batch := range batches {
|
||||
if len(batch) > 0 {
|
||||
expired := batch
|
||||
errGroup.Go(func() error {
|
||||
handlers[typ](egCtx, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -400,6 +423,41 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Shard) handleExpiredTombstones(ctx context.Context, expired []oid.Address) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(expired...)
|
||||
inhumePrm.SetGCMark()
|
||||
|
||||
res, err := s.metaBase.Inhume(ctx, inhumePrm)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
|
||||
s.decObjectCounterBy(logical, res.LogicInhumed())
|
||||
s.decObjectCounterBy(user, res.UserInhumed())
|
||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||
|
||||
for i := range res.GetDeletionInfoLength() {
|
||||
delInfo := res.GetDeletionInfoByIndex(i)
|
||||
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -535,59 +593,6 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredLockObjects(ctx context.Context, e Event) {
|
||||
var err error
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
||||
defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
|
||||
|
||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
batch := make([]oid.Address, 0, batchSize)
|
||||
|
||||
expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
||||
if o.Type() == objectSDK.TypeLock {
|
||||
batch = append(batch, o.Address())
|
||||
|
||||
if len(batch) == batchSize {
|
||||
expired := batch
|
||||
errGroup.Go(func() error {
|
||||
s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
batch = make([]oid.Address, 0, batchSize)
|
||||
}
|
||||
}
|
||||
})
|
||||
if expErr != nil {
|
||||
return expErr
|
||||
}
|
||||
|
||||
if len(batch) > 0 {
|
||||
expired := batch
|
||||
errGroup.Go(func() error {
|
||||
s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired)
|
||||
return egCtx.Err()
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err = errGroup.Wait(); err != nil {
|
||||
s.log.Warn(ctx, logs.ShardIteratorOverExpiredLocksFailed, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
|
Loading…
Reference in a new issue