WIP: Physically delete complex objects after expiration (Support) #345
2 changed files with 85 additions and 2 deletions
58
pkg/local_object_storage/metabase/children.go
Normal file
58
pkg/local_object_storage/metabase/children.go
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetChildren returns parent -> children map.
|
||||||
|
// If an object has no children, then map will contain addr -> nil value.
|
||||||
|
func (db *DB) GetChildren(addresses []oid.Address) (map[oid.Address][]oid.Address, error) {
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(map[oid.Address][]oid.Address, len(addresses))
|
||||||
|
|
||||||
|
buffer := make([]byte, bucketKeySize)
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
for _, addr := range addresses {
|
||||||
|
bkt := tx.Bucket(parentBucketName(addr.Container(), buffer[:]))
|
||||||
|
if bkt == nil {
|
||||||
|
result[addr] = nil
|
||||||
|
|||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
binObjIDs, err := decodeList(bkt.Get(objectKey(addr.Object(), buffer[:])))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(binObjIDs) == 0 {
|
||||||
|
result[addr] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, binObjID := range binObjIDs {
|
||||||
|
var id oid.ID
|
||||||
|
if err = id.Decode(binObjID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var resultAddress oid.Address
|
||||||
|
resultAddress.SetContainer(addr.Container())
|
||||||
|
resultAddress.SetObject(id)
|
||||||
|
result[addr] = append(result[addr], resultAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -258,6 +258,9 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
|
s.log.Debug("staring expired objects collecting")
|
||||||
|
defer s.log.Debug("expired objects collecting completed")
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
@ -313,6 +316,12 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expired, err := s.getPhysicalAddresses(expired)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn("failure to get physical expired objects", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
inhumePrm.SetAddresses(expired...)
|
inhumePrm.SetAddresses(expired...)
|
||||||
|
@ -338,11 +347,26 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) getPhysicalAddresses(source []oid.Address) ([]oid.Address, error) {
|
||||||
|
result := make([]oid.Address, 0, len(source))
|
||||||
|
parentToChildren, err := s.metaBase.GetChildren(source)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for parent, children := range parentToChildren {
|
||||||
|
result = append(result, parent)
|
||||||
|
result = append(result, children...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
epoch := e.(newEpoch).epoch
|
epoch := e.(newEpoch).epoch
|
||||||
log := s.log.With(zap.Uint64("epoch", epoch))
|
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||||
|
|
||||||
log.Debug("started expired tombstones handling")
|
log.Debug("started expired tombstones handling")
|
||||||
|
defer log.Debug("finished expired tombstones handling")
|
||||||
|
|
||||||
const tssDeleteBatch = 50
|
const tssDeleteBatch = 50
|
||||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||||
|
@ -399,11 +423,12 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
tss = tss[:0]
|
tss = tss[:0]
|
||||||
tssExp = tssExp[:0]
|
tssExp = tssExp[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("finished expired tombstones handling")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
|
s.log.Debug("staring expired locks collecting")
|
||||||
|
defer s.log.Debug("expired locks collecting completed")
|
||||||
|
|
||||||
workersCount, batchSize := s.getExpiredObjectsParameters()
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
|
Loading…
Reference in a new issue
I recommend to make this line more declarative using
result[addr] = []oid.Address{}
and fix the commentaddr -> empty slice