From 34df21cc4ae3c50c00d27f9a80ce8785ea5fbe96 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Tue, 17 Dec 2024 01:16:03 +0300 Subject: [PATCH] [#1445] metabase: Add `IterateLocked` method Added a new method for iterating locked objects. It's indended to be used in the garbage collector for handling expired locks. Signed-off-by: Aleksey Savchuk --- .../metabase/iterators.go | 143 ++++++++++++++++++ .../metabase/iterators_test.go | 85 +++++++++++ 2 files changed, 228 insertions(+) diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 0d438e102..bbb0fd2af 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "strconv" "time" @@ -16,6 +17,7 @@ import ( "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) // ExpiredObject is a descriptor of expired object from DB. @@ -138,3 +140,144 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e }) }) } + +type LockedObject struct { + cntr cid.ID + obj oid.ID + locks []Lock +} + +func (lo *LockedObject) Address() oid.Address { + var addr oid.Address + addr.SetContainer(lo.cntr) + addr.SetObject(lo.obj) + return addr +} + +func (lo *LockedObject) Locks() []Lock { + return lo.locks +} + +type LockedObjectHandler func(*LockedObject) error + +func (db *DB) IterateLocked(ctx context.Context, offset *oid.Address, h LockedObjectHandler) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateLocked", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateLocked") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.iterateLocked(ctx, tx, h, offset) + if errors.Is(err, ErrInterruptIterator) { + return nil + } + return err + })) + success = err == nil + return err +} + +func (db *DB) iterateLocked(ctx context.Context, tx *bbolt.Tx, h LockedObjectHandler, offset *oid.Address) error { + bucketLocked := tx.Bucket(bucketNameLocked) + if bucketLocked == nil { + return nil + } + + var k, v []byte + + c := bucketLocked.Cursor() + if offset != nil { + cntrOffset := make([]byte, cidSize) + offset.Container().Encode(cntrOffset) + + k, v = c.Seek(cntrOffset) + } else { + k, v = c.First() + } + + var ( + res LockedObject + err error + seenFirstBucket bool + ) + + for ; k != nil; k, v = c.Next() { + if v != nil { + continue + } + + if err = res.cntr.Decode(k); err != nil { + return fmt.Errorf("decode CID: %w", err) + } + + bucket := bucketLocked.Bucket(k) + if bucket == nil { + db.log.Warn(ctx, "locked bucket expected", zap.Stringer("container", res.cntr)) + continue + } + + if offset == nil || seenFirstBucket { + err = db.iterateLockedContainer(bucket, h, &res, nil) + } else { + objOffset := offset.Object() + seenFirstBucket = true + err = db.iterateLockedContainer(bucket, h, &res, &objOffset) + } + if err != nil { + return err + } + } + return nil +} + +func (db *DB) iterateLockedContainer(b *bbolt.Bucket, h LockedObjectHandler, res *LockedObject, offset *oid.ID) error { + var k, v []byte + + c := b.Cursor() + if offset != nil { + objOffset := make([]byte, objectKeySize) + offset.Encode(objOffset) + + k, v = c.Seek(objOffset) + if bytes.Equal(k, objOffset) { + k, v = c.Next() + } + } else { + k, v = c.First() + } + + for ; k != nil; k, v = c.Next() { + if err := res.obj.Decode(k); err != nil { + return fmt.Errorf("decode OID: %w", err) + } + + locks, err := decodeLockWithExpEpochList(v) + if err != nil { + return fmt.Errorf("decode lock list: %w", err) + } + + res.locks = make([]Lock, len(locks)) + for i, lock := range locks { + if err := lock.decode(&res.locks[i].ID, &res.locks[i].ExpEpoch); err != nil { + return fmt.Errorf("decode lock: %w", err) + } + } + + if err := h(res); err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/metabase/iterators_test.go b/pkg/local_object_storage/metabase/iterators_test.go index 47579ec17..1a7aaccc3 100644 --- a/pkg/local_object_storage/metabase/iterators_test.go +++ b/pkg/local_object_storage/metabase/iterators_test.go @@ -2,6 +2,7 @@ package meta_test import ( "context" + "math/rand" "strconv" "testing" @@ -9,10 +10,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" ) func TestDB_IterateExpired(t *testing.T) { @@ -67,3 +70,85 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ objectSDK.Type, expiresAt return object2.AddressOf(obj) } + +func TestIterateLocked(t *testing.T) { + t.Parallel() + + const ( + numObjects = 10 + numLocksPerObject = 10 + ) + + db := newDB(t) + defer func() { require.NoError(t, db.Close(context.Background())) }() + + m := make(map[oid.Address][]meta.Lock) + + // every two objects have same container + for range numObjects / 2 { + container := cidtest.ID() + + object := testutil.GenerateObjectWithCID(container) + require.NoError(t, metaPut(db, object, nil)) + addr := object2.AddressOf(object) + m[addr] = []meta.Lock{} + + object = testutil.GenerateObjectWithCID(container) + require.NoError(t, metaPut(db, object, nil)) + addr = object2.AddressOf(object) + m[addr] = []meta.Lock{} + } + + for addr := range m { + // every object has locks of both old and new format + for range numLocksPerObject / 2 { + m[addr] = append(m[addr], + meta.Lock{ + ID: oidtest.ID(), + ExpEpoch: rand.Uint64(), + }, meta.Lock{ + ID: oidtest.ID(), + ExpEpoch: meta.NoExpirationEpoch, + }, + ) + } + + for _, lock := range m[addr] { + require.NoError(t, db.Lock( + context.Background(), + addr.Container(), + lock.ID, + []oid.ID{addr.Object()}, + lock.ExpEpoch, + )) + } + } + + t.Run("iterate locked", func(t *testing.T) { + err := db.IterateLocked(context.Background(), nil, func(lo *meta.LockedObject) error { + locks, ok := m[lo.Address()] + require.True(t, ok) + require.ElementsMatch(t, locks, lo.Locks()) + return nil + }) + require.NoError(t, err) + }) + + t.Run("iterate with offset", func(t *testing.T) { + var foundObjects []oid.Address + var offset *oid.Address + + for didStep := true; didStep; { + didStep = false + err := db.IterateLocked(context.Background(), offset, func(lo *meta.LockedObject) error { + addr := lo.Address() + foundObjects = append(foundObjects, addr) + offset = &addr + didStep = true + return meta.ErrInterruptIterator + }) + require.NoError(t, err) + } + require.ElementsMatch(t, foundObjects, maps.Keys(m)) + }) +}