From 9f0bce5c15047e1f1272d2f0d8c2997a1bc76dd8 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Mar 2023 11:17:15 +0300 Subject: [PATCH] [#183] gc: Fix drop expired locked simple objects Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/inhume.go | 4 +- pkg/local_object_storage/metabase/exists.go | 19 --- pkg/local_object_storage/metabase/expired.go | 145 ++++++++++++++++++ .../metabase/expired_test.go | 93 +++++++++++ pkg/local_object_storage/metabase/lock.go | 36 +++-- .../metabase/lock_test.go | 4 +- pkg/local_object_storage/shard/control.go | 3 +- pkg/local_object_storage/shard/gc.go | 45 ++++-- pkg/local_object_storage/shard/gc_test.go | 4 +- pkg/local_object_storage/shard/shard.go | 2 +- 10 files changed, 306 insertions(+), 49 deletions(-) create mode 100644 pkg/local_object_storage/metabase/expired.go create mode 100644 pkg/local_object_storage/metabase/expired_test.go diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 007e51d8..680e773c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -216,9 +216,9 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me }) } -func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) { +func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - sh.HandleExpiredLocks(lockers) + sh.HandleExpiredLocks(ctx, epoch, lockers) select { case <-ctx.Done(): diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 6ad65c73..686b6588 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -2,7 +2,6 @@ package meta import ( "fmt" - "strconv" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -125,24 +124,6 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt) } -func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool { - // bucket with objects that have expiration attr - attrKey := make([]byte, bucketKeySize+len(attr)) - expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey)) - if expirationBucket != nil { - // bucket that contains objects that expire in the current epoch - prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10))) - if prevEpochBkt != nil { - rawOID := objectKey(addr.Object(), make([]byte, objectKeySize)) - if prevEpochBkt.Get(rawOID) != nil { - return true - } - } - } - - return false -} - func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 { if graveyard == nil { // incorrect metabase state, does not make diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go new file mode 100644 index 00000000..d20fdbfa --- /dev/null +++ b/pkg/local_object_storage/metabase/expired.go @@ -0,0 +1,145 @@ +package meta + +import ( + "context" + "errors" + "fmt" + "strconv" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +// FilterExpired return expired items from addresses. +// Address considered expired if metabase does contain information about expiration and +// expiration epoch is less than epoch. +func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + result := make([]oid.Address, 0, len(addresses)) + containerIDToObjectIDs := make(map[cid.ID][]oid.ID) + for _, addr := range addresses { + containerIDToObjectIDs[addr.Container()] = append(containerIDToObjectIDs[addr.Container()], addr.Object()) + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + for containerID, objectIDs := range containerIDToObjectIDs { + select { + case <-ctx.Done(): + return ErrInterruptIterator + default: + } + + expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs) + if err != nil { + return err + } + + expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs) + if err != nil { + return err + } + + for _, o := range expiredNeoFS { + var a oid.Address + a.SetContainer(containerID) + a.SetObject(o) + result = append(result, a) + } + + for _, o := range expiredSys { + var a oid.Address + a.SetContainer(containerID) + a.SetObject(o) + result = append(result, a) + } + } + return nil + }) + + if err != nil { + return nil, err + } + return result, nil +} + +func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool { + // bucket with objects that have expiration attr + attrKey := make([]byte, bucketKeySize+len(attr)) + expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey)) + if expirationBucket != nil { + // bucket that contains objects that expire in the current epoch + prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10))) + if prevEpochBkt != nil { + rawOID := objectKey(addr.Object(), make([]byte, objectKeySize)) + if prevEpochBkt.Get(rawOID) != nil { + return true + } + } + } + + return false +} + +func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) { + result := make([]oid.ID, 0) + notResolved := make(map[oid.ID]struct{}) + for _, oid := range objectIDs { + notResolved[oid] = struct{}{} + } + + expiredBuffer := make([]oid.ID, 0) + objectKeyBuffer := make([]byte, objectKeySize) + + expirationBucketKey := make([]byte, bucketKeySize+len(attr)) + expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey)) + if expirationBucket == nil { + return result, nil // all not expired + } + + err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error { + bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64) + if err != nil { + return fmt.Errorf("could not parse expiration epoch: %w", err) + } else if bucketExpiresAfter >= epoch { + return nil + } + + epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey) + if epochExpirationBucket == nil { + return nil + } + + expiredBuffer = expiredBuffer[:0] + for oid := range notResolved { + key := objectKey(oid, objectKeyBuffer) + if epochExpirationBucket.Get(key) != nil { + expiredBuffer = append(expiredBuffer, oid) + } + } + + for _, oid := range expiredBuffer { + delete(notResolved, oid) + result = append(result, oid) + } + + if len(notResolved) == 0 { + return errBreakBucketForEach + } + + return nil + }) + + if err != nil && !errors.Is(err, errBreakBucketForEach) { + return nil, err + } + + return result, nil +} diff --git a/pkg/local_object_storage/metabase/expired_test.go b/pkg/local_object_storage/metabase/expired_test.go new file mode 100644 index 00000000..5755b5cd --- /dev/null +++ b/pkg/local_object_storage/metabase/expired_test.go @@ -0,0 +1,93 @@ +package meta_test + +import ( + "context" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestDB_SelectExpired(t *testing.T) { + db := newDB(t) + + containerID1 := cidtest.ID() + + expiredObj11 := testutil.GenerateObject() + expiredObj11.SetContainerID(containerID1) + setExpiration(expiredObj11, 10) + err := putBig(db, expiredObj11) + require.NoError(t, err) + + expiredObj12 := testutil.GenerateObject() + expiredObj12.SetContainerID(containerID1) + setExpiration(expiredObj12, 12) + err = putBig(db, expiredObj12) + require.NoError(t, err) + + notExpiredObj11 := testutil.GenerateObject() + notExpiredObj11.SetContainerID(containerID1) + setExpiration(notExpiredObj11, 20) + err = putBig(db, notExpiredObj11) + require.NoError(t, err) + + regularObj11 := testutil.GenerateObject() + regularObj11.SetContainerID(containerID1) + err = putBig(db, regularObj11) + require.NoError(t, err) + + containerID2 := cidtest.ID() + + expiredObj21 := testutil.GenerateObject() + expiredObj21.SetContainerID(containerID2) + setExpiration(expiredObj21, 10) + err = putBig(db, expiredObj21) + require.NoError(t, err) + + expiredObj22 := testutil.GenerateObject() + expiredObj22.SetContainerID(containerID2) + setExpiration(expiredObj22, 12) + err = putBig(db, expiredObj22) + require.NoError(t, err) + + notExpiredObj21 := testutil.GenerateObject() + notExpiredObj21.SetContainerID(containerID2) + setExpiration(notExpiredObj21, 20) + err = putBig(db, notExpiredObj21) + require.NoError(t, err) + + regularObj21 := testutil.GenerateObject() + regularObj21.SetContainerID(containerID2) + err = putBig(db, regularObj21) + require.NoError(t, err) + + expired, err := db.FilterExpired(context.Background(), 15, + []oid.Address{ + getAddressSafe(t, expiredObj11), getAddressSafe(t, expiredObj12), getAddressSafe(t, notExpiredObj11), getAddressSafe(t, regularObj11), + getAddressSafe(t, expiredObj21), getAddressSafe(t, expiredObj22), getAddressSafe(t, notExpiredObj21), getAddressSafe(t, regularObj21), + }) + require.NoError(t, err) + require.Equal(t, 4, len(expired), "invalid expired count") + require.Contains(t, expired, getAddressSafe(t, expiredObj11)) + require.Contains(t, expired, getAddressSafe(t, expiredObj12)) + require.Contains(t, expired, getAddressSafe(t, expiredObj21)) + require.Contains(t, expired, getAddressSafe(t, expiredObj22)) +} + +func getAddressSafe(t *testing.T, o *object.Object) oid.Address { + cid, set := o.ContainerID() + if !set { + t.Fatalf("container id required") + } + id, set := o.ID() + if !set { + t.Fatalf("object id required") + } + var addr oid.Address + addr.SetContainer(cid) + addr.SetObject(id) + return addr +} diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 2e6bed93..6850cf61 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -96,25 +96,27 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error { } // FreeLockedBy unlocks all objects in DB which are locked by lockers. -func (db *DB) FreeLockedBy(lockers []oid.Address) error { +// Returns slice of unlocked object ID's or an error. +func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { - return ErrDegradedMode + return nil, ErrDegradedMode } - return db.boltDB.Update(func(tx *bbolt.Tx) error { - var err error + var unlockedObjects []oid.Address + return unlockedObjects, db.boltDB.Update(func(tx *bbolt.Tx) error { for i := range lockers { - err = freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) + unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) if err != nil { return err } + unlockedObjects = append(unlockedObjects, unlocked...) } - return err + return nil }) } @@ -134,14 +136,16 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { } // releases all records about the objects locked by the locker. +// Returns slice of unlocked object ID's or an error. // // Operation is very resource-intensive, which is caused by the admissibility // of multiple locks. Also, if we knew what objects are locked, it would be // possible to speed up the execution. -func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { +func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) { + var unlockedObjects []oid.Address bucketLocked := tx.Bucket(bucketNameLocked) if bucketLocked == nil { - return nil + return unlockedObjects, nil } key := make([]byte, cidSize) @@ -149,11 +153,11 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { bucketLockedContainer := bucketLocked.Bucket(key) if bucketLockedContainer == nil { - return nil + return unlockedObjects, nil } keyLocker := objectKey(locker, key) - return bucketLockedContainer.ForEach(func(k, v []byte) error { + return unlockedObjects, bucketLockedContainer.ForEach(func(k, v []byte) error { keyLockers, err := decodeList(v) if err != nil { return fmt.Errorf("decode list of lockers in locked bucket: %w", err) @@ -167,6 +171,18 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { if err != nil { return fmt.Errorf("delete locked object record from locked bucket: %w", err) } + + var id oid.ID + err = id.Decode(k) + if err != nil { + return fmt.Errorf("decode unlocked object id error: %w", err) + } + + var addr oid.Address + addr.SetContainer(idCnr) + addr.SetObject(id) + + unlockedObjects = append(unlockedObjects, addr) } else { // exclude locker keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index d815b711..efa9fba0 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -110,7 +110,7 @@ func TestDB_Lock(t *testing.T) { require.Len(t, res.DeletedLockObjects(), 1) require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0]) - err = db.FreeLockedBy([]oid.Address{lockAddr}) + _, err = db.FreeLockedBy([]oid.Address{lockAddr}) require.NoError(t, err) inhumePrm.SetAddresses(objAddr) @@ -141,7 +141,7 @@ func TestDB_Lock(t *testing.T) { // unlock just objects that were locked by // just removed locker - err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]}) + _, err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]}) require.NoError(t, err) // removing objects after unlock diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 5aa7fbd3..61553ac1 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -143,7 +143,8 @@ func (s *Shard) Init() error { eventNewEpoch: { cancelFunc: func() {}, handlers: []eventHandler{ - s.collectExpiredLocksAndObjects, + s.collectExpiredLocks, + s.collectExpiredObjects, s.collectExpiredTombstones, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index b5874711..6335145e 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -243,11 +243,6 @@ func (s *Shard) removeGarbage() { } } -func (s *Shard) collectExpiredLocksAndObjects(ctx context.Context, e Event) { - s.collectExpiredLocks(ctx, e) - s.collectExpiredObjects(ctx, e) -} - func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) { workersCount = minExpiredWorkers batchSize = minExpiredBatchSize @@ -424,7 +419,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { if len(batch) == batchSize { expired := batch errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, expired) + s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) return egCtx.Err() }) batch = make([]oid.Address, 0, batchSize) @@ -438,7 +433,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { if len(batch) > 0 { expired := batch errGroup.Go(func() error { - s.expiredLocksCallback(egCtx, expired) + s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) return egCtx.Err() }) } @@ -474,6 +469,17 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo return ctx.Err() } +func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return nil, ErrDegradedMode + } + + return s.metaBase.FilterExpired(ctx, epoch, addresses) +} + // HandleExpiredTombstones marks tombstones themselves as garbage // and clears up corresponding graveyard records. // @@ -523,12 +529,11 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { // HandleExpiredLocks unlocks all objects which were locked by lockers. // If successful, marks lockers themselves as garbage. -func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { +func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { if s.GetMode().NoMetabase() { return } - - err := s.metaBase.FreeLockedBy(lockers) + unlocked, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn("failure to unlock objects", zap.String("error", err.Error()), @@ -539,7 +544,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { var pInhume meta.InhumePrm pInhume.SetAddresses(lockers...) - pInhume.SetGCMark() + pInhume.SetForceGCMark() res, err := s.metaBase.Inhume(pInhume) if err != nil { @@ -558,6 +563,22 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) i++ } + + s.inhumeUnlockedIfExpired(ctx, epoch, unlocked) +} + +func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unlocked []oid.Address) { + expiredUnlocked, err := s.selectExpired(ctx, epoch, unlocked) + if err != nil { + s.log.Warn("failure to get expired unlocked objects", zap.Error(err)) + return + } + + if len(expiredUnlocked) == 0 { + return + } + + s.handleExpiredObjects(ctx, expiredUnlocked) } // HandleDeletedLocks unlocks all objects which were locked by lockers. @@ -566,7 +587,7 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { return } - err := s.metaBase.FreeLockedBy(lockers) + _, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn("failure to unlock objects", zap.String("error", err.Error()), diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 1e266f51..a4bbefde 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -59,8 +59,8 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) { shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { sh.HandleDeletedLocks(addresses) }), - shard.WithExpiredLocksCallback(func(_ context.Context, a []oid.Address) { - sh.HandleExpiredLocks(a) + shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { + sh.HandleExpiredLocks(ctx, epoch, a) }), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index dd74dad3..6d1fba14 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -40,7 +40,7 @@ type Option func(*cfg) type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) // ExpiredObjectsCallback is a callback handling list of expired objects. -type ExpiredObjectsCallback func(context.Context, []oid.Address) +type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address)