forked from TrueCloudLab/frostfs-node
[#199] Get virtual objects from metabase
Virtual objects are obtained differently with relative lookup in parent bucket. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
4e7d49791b
commit
700bd7de01
2 changed files with 93 additions and 21 deletions
|
@ -1,6 +1,9 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -17,8 +20,8 @@ func (db *DB) Get(addr *objectSDK.Address) (obj *object.Object, err error) {
|
||||||
return obj, err
|
return obj, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, err error) {
|
func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (*object.Object, error) {
|
||||||
obj = object.New()
|
obj := object.New()
|
||||||
key := objectKey(addr.ObjectID())
|
key := objectKey(addr.ObjectID())
|
||||||
cid := addr.ContainerID()
|
cid := addr.ContainerID()
|
||||||
|
|
||||||
|
@ -44,7 +47,8 @@ func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, er
|
||||||
return obj, obj.Unmarshal(data)
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, ErrNotFound
|
// if not found then check if object is a virtual
|
||||||
|
return getVirtualObject(tx, cid, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||||
|
@ -55,3 +59,38 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||||
|
|
||||||
return bkt.Get(key)
|
return bkt.Get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Object, error) {
|
||||||
|
parentBucket := tx.Bucket(parentBucketName(cid))
|
||||||
|
if parentBucket == nil {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
relativeLst, err := decodeList(parentBucket.Get(key))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(relativeLst) == 0 { // this should never happen though
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// pick last item, for now there is not difference which address to pick
|
||||||
|
// but later list might be sorted so first or last value can be more
|
||||||
|
// prioritized to choose
|
||||||
|
virtualOID := relativeLst[len(relativeLst)-1]
|
||||||
|
data := getFromBucket(tx, primaryBucketName(cid), virtualOID)
|
||||||
|
|
||||||
|
child := object.New()
|
||||||
|
|
||||||
|
err = child.Unmarshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("can't unmarshal child with parent: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if child.GetParent() == nil { // this should never happen though
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return child.GetParent(), nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta_test
|
package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
@ -13,51 +14,83 @@ func TestDB_Get(t *testing.T) {
|
||||||
defer releaseDB(db)
|
defer releaseDB(db)
|
||||||
|
|
||||||
raw := generateRawObject(t)
|
raw := generateRawObject(t)
|
||||||
|
|
||||||
|
// equal fails on diff of <nil> attributes and <{}> attributes,
|
||||||
|
/* so we make non empty attribute slice in parent*/
|
||||||
addAttribute(raw, "foo", "bar")
|
addAttribute(raw, "foo", "bar")
|
||||||
|
|
||||||
t.Run("object not found", func(t *testing.T) {
|
t.Run("object not found", func(t *testing.T) {
|
||||||
obj := object.NewFromV2(raw.ToV2())
|
_, err := db.Get(raw.Object().Address())
|
||||||
|
|
||||||
_, err := db.Get(obj.Address())
|
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put regular object", func(t *testing.T) {
|
t.Run("put regular object", func(t *testing.T) {
|
||||||
obj := object.NewFromV2(raw.ToV2())
|
err := db.Put(raw.Object(), nil)
|
||||||
|
|
||||||
err := db.Put(obj, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
newObj, err := db.Get(obj.Address())
|
newObj, err := db.Get(raw.Object().Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, obj, newObj)
|
require.Equal(t, raw.Object(), newObj)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put tombstone object", func(t *testing.T) {
|
t.Run("put tombstone object", func(t *testing.T) {
|
||||||
raw.SetType(objectSDK.TypeTombstone)
|
raw.SetType(objectSDK.TypeTombstone)
|
||||||
raw.SetID(testOID())
|
raw.SetID(testOID())
|
||||||
|
|
||||||
obj := object.NewFromV2(raw.ToV2())
|
err := db.Put(raw.Object(), nil)
|
||||||
|
|
||||||
err := db.Put(obj, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
newObj, err := db.Get(obj.Address())
|
newObj, err := db.Get(raw.Object().Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, obj, newObj)
|
require.Equal(t, raw.Object(), newObj)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put storage group object", func(t *testing.T) {
|
t.Run("put storage group object", func(t *testing.T) {
|
||||||
raw.SetType(objectSDK.TypeStorageGroup)
|
raw.SetType(objectSDK.TypeStorageGroup)
|
||||||
raw.SetID(testOID())
|
raw.SetID(testOID())
|
||||||
|
|
||||||
obj := object.NewFromV2(raw.ToV2())
|
err := db.Put(raw.Object(), nil)
|
||||||
|
|
||||||
err := db.Put(obj, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
newObj, err := db.Get(obj.Address())
|
newObj, err := db.Get(raw.Object().Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, obj, newObj)
|
require.Equal(t, raw.Object(), newObj)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("put virtual object", func(t *testing.T) {
|
||||||
|
cid := testCID()
|
||||||
|
parent := generateRawObjectWithCID(t, cid)
|
||||||
|
addAttribute(parent, "foo", "bar")
|
||||||
|
|
||||||
|
child := generateRawObjectWithCID(t, cid)
|
||||||
|
child.SetParent(parent.Object().SDK())
|
||||||
|
child.SetParentID(parent.Object().Address().ObjectID())
|
||||||
|
|
||||||
|
err := db.Put(child.Object(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
newParent, err := db.Get(parent.Object().Address())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, binaryEqual(parent.Object(), newParent))
|
||||||
|
|
||||||
|
newChild, err := db.Get(child.Object().Address())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, binaryEqual(child.Object(), newChild))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// binary equal is used when object contains empty lists in the structure and
|
||||||
|
// requre.Equal fails on comparing <nil> and []{} lists.
|
||||||
|
func binaryEqual(a, b *object.Object) bool {
|
||||||
|
binaryA, err := a.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
binaryB, err := b.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.Equal(binaryA, binaryB)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue