From f6387a9b9403f70c9c07727f59d914aca046cab9 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 27 Nov 2020 10:59:47 +0300 Subject: [PATCH] [#199] Make exist check on all primary buckets in metabase Signed-off-by: Alex Vanin --- .../metabase/v2/exists.go | 57 +++++++++++----- .../metabase/v2/exists_test.go | 65 +++++++++++++++---- 2 files changed, 95 insertions(+), 27 deletions(-) diff --git a/pkg/local_object_storage/metabase/v2/exists.go b/pkg/local_object_storage/metabase/v2/exists.go index c1fb38324..4ccefa0ac 100644 --- a/pkg/local_object_storage/metabase/v2/exists.go +++ b/pkg/local_object_storage/metabase/v2/exists.go @@ -9,27 +9,41 @@ import ( // returns true if addr is in primary index or false if it is not. func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) { err = db.boltDB.View(func(tx *bbolt.Tx) error { - // check graveyard first - if inGraveyard(tx, addr) { - return ErrAlreadyRemoved - } + exists, err = db.exists(tx, addr) - // if graveyard is empty, then check if object exists in primary bucket - primaryBucket := tx.Bucket(primaryBucketName(addr.ContainerID())) - if primaryBucket == nil { - return nil - } - - // using `get` as `exists`: https://github.com/boltdb/bolt/issues/321 - val := primaryBucket.Get(objectKey(addr.ObjectID())) - exists = len(val) != 0 - - return nil + return err }) return exists, err } +func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err error) { + // check graveyard first + if inGraveyard(tx, addr) { + return false, ErrAlreadyRemoved + } + + objKey := objectKey(addr.ObjectID()) + + // if graveyard is empty, then check if object exists in primary bucket + if inBucket(tx, primaryBucketName(addr.ContainerID()), objKey) { + return true, nil + } + + // if primary bucket is empty, then check if object exists in parent bucket + if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) { + return true, nil + } + + // if parent bucket is empty, then check if object exists in tombstone bucket + if inBucket(tx, tombstoneBucketName(addr.ContainerID()), objKey) { + return true, nil + } + + // if parent bucket is empty, then check if object exists in storage group bucket + return inBucket(tx, storageGroupBucketName(addr.ContainerID()), objKey), nil +} + // inGraveyard returns true if object was marked as removed. func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool { graveyard := tx.Bucket(graveyardBucketName) @@ -41,3 +55,16 @@ func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool { return len(tombstone) != 0 } + +// inBucket checks if key is present in bucket . +func inBucket(tx *bbolt.Tx, name, key []byte) bool { + bkt := tx.Bucket(name) + if bkt == nil { + return false + } + + // using `get` as `exists`: https://github.com/boltdb/bolt/issues/321 + val := bkt.Get(key) + + return len(val) != 0 +} diff --git a/pkg/local_object_storage/metabase/v2/exists_test.go b/pkg/local_object_storage/metabase/v2/exists_test.go index 5ca143afa..2ebec728c 100644 --- a/pkg/local_object_storage/metabase/v2/exists_test.go +++ b/pkg/local_object_storage/metabase/v2/exists_test.go @@ -3,7 +3,7 @@ package meta_test import ( "testing" - "github.com/nspcc-dev/neofs-node/pkg/core/object" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/stretchr/testify/require" ) @@ -11,19 +11,60 @@ func TestDB_Exists(t *testing.T) { db := newDB(t) defer releaseDB(db) - raw := generateRawObject(t) - addAttribute(raw, "foo", "bar") + t.Run("no object", func(t *testing.T) { + nonExist := generateRawObject(t) + exists, err := db.Exists(nonExist.Object().Address()) + require.NoError(t, err) + require.False(t, exists) + }) - obj := object.NewFromV2(raw.ToV2()) + t.Run("regular object", func(t *testing.T) { + regular := generateRawObject(t) + err := db.Put(regular.Object(), nil) + require.NoError(t, err) - exists, err := db.Exists(obj.Address()) - require.NoError(t, err) - require.False(t, exists) + exists, err := db.Exists(regular.Object().Address()) + require.NoError(t, err) + require.True(t, exists) + }) - err = db.Put(obj, nil) - require.NoError(t, err) + t.Run("tombstone object", func(t *testing.T) { + ts := generateRawObject(t) + ts.SetType(objectSDK.TypeTombstone) - exists, err = db.Exists(obj.Address()) - require.NoError(t, err) - require.True(t, exists) + err := db.Put(ts.Object(), nil) + require.NoError(t, err) + + exists, err := db.Exists(ts.Object().Address()) + require.NoError(t, err) + require.True(t, exists) + }) + + t.Run("storage group object", func(t *testing.T) { + sg := generateRawObject(t) + sg.SetType(objectSDK.TypeStorageGroup) + + err := db.Put(sg.Object(), nil) + require.NoError(t, err) + + exists, err := db.Exists(sg.Object().Address()) + require.NoError(t, err) + require.True(t, exists) + }) + + t.Run("virtual object", func(t *testing.T) { + cid := testCID() + parent := generateRawObjectWithCID(t, cid) + + child := generateRawObjectWithCID(t, cid) + child.SetParent(parent.Object().SDK()) + child.SetParentID(parent.Object().Address().ObjectID()) + + err := db.Put(child.Object(), nil) + require.NoError(t, err) + + exists, err := db.Exists(parent.Object().Address()) + require.NoError(t, err) + require.True(t, exists) + }) }