WIP: Physically delete complex objects after expiration (Support) #345

Closed
dstepanov-yadro wants to merge 1 commit from dstepanov-yadro/frostfs-node:fix/complex_lifetime_support into support/v0.36
2 changed files with 85 additions and 2 deletions

View file

@ -0,0 +1,58 @@
package meta
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// GetChildren returns parent -> children map.
// If an object has no children, then map will contain addr -> nil value.
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make(map[oid.Address][]oid.Address, len(addresses))
buffer := make([]byte, bucketKeySize)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for _, addr := range addresses {
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer[:]))
if bkt == nil {
result[addr] = nil
Review

I recommend to make this line more declarative using result[addr] = []oid.Address{} and fix the comment addr -> empty slice

I recommend to make this line more declarative using `result[addr] = []oid.Address{}` and fix the comment `addr -> empty slice`
continue
}
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer[:])))
if err != nil {
return err
}
if len(binObjIDs) == 0 {
result[addr] = nil
continue
}
for _, binObjID := range binObjIDs {
var id oid.ID
if err = id.Decode(binObjID); err != nil {
return err
}
var resultAddress oid.Address
resultAddress.SetContainer(addr.Container())
resultAddress.SetObject(id)
result[addr] = append(result[addr], resultAddress)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -258,6 +258,9 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
} }
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
s.log.Debug("staring expired objects collecting")
defer s.log.Debug("expired objects collecting completed")
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)
@ -313,6 +316,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
expired, err := s.getPhysicalAddresses(expired)
if err != nil {
s.log.Warn("failure to get physical expired objects", zap.Error(err))
return
}
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...) inhumePrm.SetAddresses(expired...)
@ -338,11 +347,26 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
} }
} }
func (s *Shard) getPhysicalAddresses(source []oid.Address) ([]oid.Address, error) {
result := make([]oid.Address, 0, len(source))
parentToChildren, err := s.metaBase.GetChildren(source)
if err != nil {
return nil, err
}
for parent, children := range parentToChildren {
result = append(result, parent)
result = append(result, children...)
}
return result, nil
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug("started expired tombstones handling") log.Debug("started expired tombstones handling")
defer log.Debug("finished expired tombstones handling")
const tssDeleteBatch = 50 const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
@ -399,11 +423,12 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
tss = tss[:0] tss = tss[:0]
tssExp = tssExp[:0] tssExp = tssExp[:0]
} }
log.Debug("finished expired tombstones handling")
} }
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
s.log.Debug("staring expired locks collecting")
defer s.log.Debug("expired locks collecting completed")
workersCount, batchSize := s.getExpiredObjectsParameters() workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)