WIP: Physically delete complex objects after expiration (Support) #345
2 changed files with 85 additions and 2 deletions
58
pkg/local_object_storage/metabase/children.go
Normal file
58
pkg/local_object_storage/metabase/children.go
Normal file
|
@ -0,0 +1,58 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// GetChildren returns parent -> children map.
|
||||
// If an object has no children, then map will contain addr -> nil value.
|
||||
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
result := make(map[oid.Address][]oid.Address, len(addresses))
|
||||
|
||||
buffer := make([]byte, bucketKeySize)
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
for _, addr := range addresses {
|
||||
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer[:]))
|
||||
if bkt == nil {
|
||||
result[addr] = nil
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer[:])))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(binObjIDs) == 0 {
|
||||
result[addr] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
for _, binObjID := range binObjIDs {
|
||||
var id oid.ID
|
||||
if err = id.Decode(binObjID); err != nil {
|
||||
return err
|
||||
}
|
||||
var resultAddress oid.Address
|
||||
resultAddress.SetContainer(addr.Container())
|
||||
resultAddress.SetObject(id)
|
||||
result[addr] = append(result[addr], resultAddress)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
|
@ -258,6 +258,9 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
|
|||
}
|
||||
|
||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||
s.log.Debug("staring expired objects collecting")
|
||||
defer s.log.Debug("expired objects collecting completed")
|
||||
|
||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
|
@ -313,6 +316,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
|||
return
|
||||
}
|
||||
|
||||
expired, err := s.getPhysicalAddresses(expired)
|
||||
if err != nil {
|
||||
s.log.Warn("failure to get physical expired objects", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetAddresses(expired...)
|
||||
|
@ -338,11 +347,26 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Shard) getPhysicalAddresses(source []oid.Address) ([]oid.Address, error) {
|
||||
result := make([]oid.Address, 0, len(source))
|
||||
parentToChildren, err := s.metaBase.GetChildren(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for parent, children := range parentToChildren {
|
||||
result = append(result, parent)
|
||||
result = append(result, children...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||
epoch := e.(newEpoch).epoch
|
||||
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||
|
||||
log.Debug("started expired tombstones handling")
|
||||
defer log.Debug("finished expired tombstones handling")
|
||||
|
||||
const tssDeleteBatch = 50
|
||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
|
@ -399,11 +423,12 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
|||
tss = tss[:0]
|
||||
tssExp = tssExp[:0]
|
||||
}
|
||||
|
||||
log.Debug("finished expired tombstones handling")
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||
s.log.Debug("staring expired locks collecting")
|
||||
defer s.log.Debug("expired locks collecting completed")
|
||||
|
||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
|
|
Loading…
Reference in a new issue
I recommend to make this line more declarative using
result[addr] = []oid.Address{}
and fix the commentaddr -> empty slice