From 700bd7de013e6a0a1d2d8383ac77fb1cf849eeac Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 27 Nov 2020 11:31:43 +0300 Subject: [PATCH] [#199] Get virtual objects from metabase Virtual objects are obtained differently with relative lookup in parent bucket. Signed-off-by: Alex Vanin --- pkg/local_object_storage/metabase/v2/get.go | 45 +++++++++++- .../metabase/v2/get_test.go | 69 ++++++++++++++----- 2 files changed, 93 insertions(+), 21 deletions(-) diff --git a/pkg/local_object_storage/metabase/v2/get.go b/pkg/local_object_storage/metabase/v2/get.go index d171b037..34e32741 100644 --- a/pkg/local_object_storage/metabase/v2/get.go +++ b/pkg/local_object_storage/metabase/v2/get.go @@ -1,6 +1,9 @@ package meta import ( + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "go.etcd.io/bbolt" @@ -17,8 +20,8 @@ func (db *DB) Get(addr *objectSDK.Address) (obj *object.Object, err error) { return obj, err } -func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, err error) { - obj = object.New() +func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (*object.Object, error) { + obj := object.New() key := objectKey(addr.ObjectID()) cid := addr.ContainerID() @@ -44,7 +47,8 @@ func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, er return obj, obj.Unmarshal(data) } - return nil, ErrNotFound + // if not found then check if object is a virtual + return getVirtualObject(tx, cid, key) } func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { @@ -55,3 +59,38 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { return bkt.Get(key) } + +func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Object, error) { + parentBucket := tx.Bucket(parentBucketName(cid)) + if parentBucket == nil { + return nil, ErrNotFound + } + + relativeLst, err := decodeList(parentBucket.Get(key)) + if err != nil { + return nil, err + } + + if len(relativeLst) == 0 { // this should never happen though + return nil, ErrNotFound + } + + // pick last item, for now there is not difference which address to pick + // but later list might be sorted so first or last value can be more + // prioritized to choose + virtualOID := relativeLst[len(relativeLst)-1] + data := getFromBucket(tx, primaryBucketName(cid), virtualOID) + + child := object.New() + + err = child.Unmarshal(data) + if err != nil { + return nil, fmt.Errorf("can't unmarshal child with parent: %w", err) + } + + if child.GetParent() == nil { // this should never happen though + return nil, ErrNotFound + } + + return child.GetParent(), nil +} diff --git a/pkg/local_object_storage/metabase/v2/get_test.go b/pkg/local_object_storage/metabase/v2/get_test.go index 1154fdd0..070770c5 100644 --- a/pkg/local_object_storage/metabase/v2/get_test.go +++ b/pkg/local_object_storage/metabase/v2/get_test.go @@ -1,6 +1,7 @@ package meta_test import ( + "bytes" "testing" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -13,51 +14,83 @@ func TestDB_Get(t *testing.T) { defer releaseDB(db) raw := generateRawObject(t) + + // equal fails on diff of attributes and <{}> attributes, + /* so we make non empty attribute slice in parent*/ addAttribute(raw, "foo", "bar") t.Run("object not found", func(t *testing.T) { - obj := object.NewFromV2(raw.ToV2()) - - _, err := db.Get(obj.Address()) + _, err := db.Get(raw.Object().Address()) require.Error(t, err) }) t.Run("put regular object", func(t *testing.T) { - obj := object.NewFromV2(raw.ToV2()) - - err := db.Put(obj, nil) + err := db.Put(raw.Object(), nil) require.NoError(t, err) - newObj, err := db.Get(obj.Address()) + newObj, err := db.Get(raw.Object().Address()) require.NoError(t, err) - require.Equal(t, obj, newObj) + require.Equal(t, raw.Object(), newObj) }) t.Run("put tombstone object", func(t *testing.T) { raw.SetType(objectSDK.TypeTombstone) raw.SetID(testOID()) - obj := object.NewFromV2(raw.ToV2()) - - err := db.Put(obj, nil) + err := db.Put(raw.Object(), nil) require.NoError(t, err) - newObj, err := db.Get(obj.Address()) + newObj, err := db.Get(raw.Object().Address()) require.NoError(t, err) - require.Equal(t, obj, newObj) + require.Equal(t, raw.Object(), newObj) }) t.Run("put storage group object", func(t *testing.T) { raw.SetType(objectSDK.TypeStorageGroup) raw.SetID(testOID()) - obj := object.NewFromV2(raw.ToV2()) - - err := db.Put(obj, nil) + err := db.Put(raw.Object(), nil) require.NoError(t, err) - newObj, err := db.Get(obj.Address()) + newObj, err := db.Get(raw.Object().Address()) require.NoError(t, err) - require.Equal(t, obj, newObj) + require.Equal(t, raw.Object(), newObj) + }) + + t.Run("put virtual object", func(t *testing.T) { + cid := testCID() + parent := generateRawObjectWithCID(t, cid) + addAttribute(parent, "foo", "bar") + + child := generateRawObjectWithCID(t, cid) + child.SetParent(parent.Object().SDK()) + child.SetParentID(parent.Object().Address().ObjectID()) + + err := db.Put(child.Object(), nil) + require.NoError(t, err) + + newParent, err := db.Get(parent.Object().Address()) + require.NoError(t, err) + require.True(t, binaryEqual(parent.Object(), newParent)) + + newChild, err := db.Get(child.Object().Address()) + require.NoError(t, err) + require.True(t, binaryEqual(child.Object(), newChild)) }) } + +// binary equal is used when object contains empty lists in the structure and +// requre.Equal fails on comparing and []{} lists. +func binaryEqual(a, b *object.Object) bool { + binaryA, err := a.Marshal() + if err != nil { + return false + } + + binaryB, err := b.Marshal() + if err != nil { + return false + } + + return bytes.Equal(binaryA, binaryB) +}