From e478c0d0240425276c74a9a54ad9372663e888d4 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 23 Nov 2020 16:30:56 +0300 Subject: [PATCH] [#199] Refactor metabase internal structure Accoring to MetaBase-Plan-B storage engine specification. Signed-off-by: Alex Vanin --- pkg/local_object_storage/metabase/v2/db.go | 106 ++++++ .../metabase/v2/db_test.go | 113 ++++++ .../metabase/v2/errors.go | 14 + .../metabase/v2/exists.go | 43 +++ .../metabase/v2/exists_test.go | 29 ++ pkg/local_object_storage/metabase/v2/get.go | 57 +++ .../metabase/v2/get_test.go | 63 ++++ pkg/local_object_storage/metabase/v2/info.go | 12 + .../metabase/v2/inhume.go | 19 + .../metabase/v2/inhume_test.go | 32 ++ .../metabase/v2/movable.go | 73 ++++ .../metabase/v2/movable_test.go | 59 ++++ pkg/local_object_storage/metabase/v2/put.go | 268 ++++++++++++++ .../metabase/v2/select.go | 331 ++++++++++++++++++ .../metabase/v2/select_test.go | 302 ++++++++++++++++ pkg/local_object_storage/metabase/v2/small.go | 28 ++ .../metabase/v2/small_test.go | 45 +++ pkg/local_object_storage/metabase/v2/util.go | 92 +++++ 18 files changed, 1686 insertions(+) create mode 100644 pkg/local_object_storage/metabase/v2/db.go create mode 100644 pkg/local_object_storage/metabase/v2/db_test.go create mode 100644 pkg/local_object_storage/metabase/v2/errors.go create mode 100644 pkg/local_object_storage/metabase/v2/exists.go create mode 100644 pkg/local_object_storage/metabase/v2/exists_test.go create mode 100644 pkg/local_object_storage/metabase/v2/get.go create mode 100644 pkg/local_object_storage/metabase/v2/get_test.go create mode 100644 pkg/local_object_storage/metabase/v2/info.go create mode 100644 pkg/local_object_storage/metabase/v2/inhume.go create mode 100644 pkg/local_object_storage/metabase/v2/inhume_test.go create mode 100644 pkg/local_object_storage/metabase/v2/movable.go create mode 100644 pkg/local_object_storage/metabase/v2/movable_test.go create mode 100644 pkg/local_object_storage/metabase/v2/put.go create mode 100644 pkg/local_object_storage/metabase/v2/select.go create mode 100644 pkg/local_object_storage/metabase/v2/select_test.go create mode 100644 pkg/local_object_storage/metabase/v2/small.go create mode 100644 pkg/local_object_storage/metabase/v2/small_test.go create mode 100644 pkg/local_object_storage/metabase/v2/util.go diff --git a/pkg/local_object_storage/metabase/v2/db.go b/pkg/local_object_storage/metabase/v2/db.go new file mode 100644 index 00000000..faa3a615 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/db.go @@ -0,0 +1,106 @@ +package meta + +import ( + "encoding/binary" + "encoding/hex" + "strconv" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// DB represents local metabase of storage node. +type DB struct { + info Info + + *cfg + + matchers map[object.SearchMatchType]func(string, []byte, string) bool +} + +// Option is an option of DB constructor. +type Option func(*cfg) + +type cfg struct { + boltDB *bbolt.DB + + log *logger.Logger +} + +func defaultCfg() *cfg { + return &cfg{ + log: zap.L(), + } +} + +// NewDB creates, initializes and returns DB instance. +func NewDB(opts ...Option) *DB { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &DB{ + info: Info{ + Path: c.boltDB.Path(), + }, + cfg: c, + matchers: map[object.SearchMatchType]func(string, []byte, string) bool{ + object.MatchUnknown: unknownMatcher, + object.MatchStringEqual: stringEqualMatcher, + }, + } +} + +func (db *DB) Close() error { + return db.boltDB.Close() +} + +func stringEqualMatcher(key string, objVal []byte, filterVal string) bool { + switch key { + default: + return string(objVal) == filterVal + case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash: + return hex.EncodeToString(objVal) == filterVal + case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength: + return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10) == filterVal + } +} + +func unknownMatcher(_ string, _ []byte, _ string) bool { + return false +} + +// bucketKeyHelper returns byte representation of val that is used as a key +// in boltDB. Useful for getting filter values from unique and list indexes. +func bucketKeyHelper(hdr string, val string) []byte { + switch hdr { + case v2object.FilterHeaderPayloadHash: + v, err := hex.DecodeString(val) + if err != nil { + return nil + } + + return v + default: + return []byte(val) + } +} + +// FromBoltDB returns option to construct DB from BoltDB instance. +func FromBoltDB(db *bbolt.DB) Option { + return func(c *cfg) { + c.boltDB = db + } +} + +// WithLogger returns option to set logger of DB. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = l + } +} diff --git a/pkg/local_object_storage/metabase/v2/db_test.go b/pkg/local_object_storage/metabase/v2/db_test.go new file mode 100644 index 00000000..341c5253 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/db_test.go @@ -0,0 +1,113 @@ +package meta_test + +import ( + "crypto/rand" + "crypto/sha256" + "os" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase/v2" + "github.com/nspcc-dev/neofs-node/pkg/util/test" + "github.com/nspcc-dev/tzhash/tz" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func testSelect(t *testing.T, db *meta.DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) { + res, err := db.Select(fs) + require.NoError(t, err) + require.Len(t, res, len(exp)) + + for i := range exp { + require.Contains(t, res, exp[i]) + } +} + +func testCID() *container.ID { + cs := [sha256.Size]byte{} + _, _ = rand.Read(cs[:]) + + id := container.NewID() + id.SetSHA256(cs) + + return id +} + +func testOID() *objectSDK.ID { + cs := [sha256.Size]byte{} + _, _ = rand.Read(cs[:]) + + id := objectSDK.NewID() + id.SetSHA256(cs) + + return id +} + +func newDB(t testing.TB) *meta.DB { + path := t.Name() + + bdb, err := bbolt.Open(path, 0600, nil) + require.NoError(t, err) + + return meta.NewDB(meta.FromBoltDB(bdb)) +} + +func releaseDB(db *meta.DB) { + db.Close() + os.Remove(db.DumpInfo().Path) +} + +func generateRawObject(t *testing.T) *object.RawObject { + return generateRawObjectWithCID(t, testCID()) +} + +func generateRawObjectWithCID(t *testing.T, cid *container.ID) *object.RawObject { + version := pkg.NewVersion() + version.SetMajor(2) + version.SetMinor(1) + + w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey) + require.NoError(t, err) + + ownerID := owner.NewID() + ownerID.SetNeo3Wallet(w) + + csum := new(pkg.Checksum) + csum.SetSHA256(sha256.Sum256(w.Bytes())) + + csumTZ := new(pkg.Checksum) + csumTZ.SetTillichZemor(tz.Sum(csum.Sum())) + + obj := object.NewRaw() + obj.SetID(testOID()) + obj.SetOwnerID(ownerID) + obj.SetContainerID(cid) + obj.SetVersion(version) + obj.SetPayloadChecksum(csum) + obj.SetPayloadHomomorphicHash(csumTZ) + + return obj +} + +func generateAddress() *objectSDK.Address { + addr := objectSDK.NewAddress() + addr.SetContainerID(testCID()) + addr.SetObjectID(testOID()) + + return addr +} + +func addAttribute(obj *object.RawObject, key, val string) { + attr := objectSDK.NewAttribute() + attr.SetKey(key) + attr.SetValue(val) + + attrs := obj.Attributes() + attrs = append(attrs, attr) + obj.SetAttributes(attrs...) +} diff --git a/pkg/local_object_storage/metabase/v2/errors.go b/pkg/local_object_storage/metabase/v2/errors.go new file mode 100644 index 00000000..33ba78d5 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/errors.go @@ -0,0 +1,14 @@ +package meta + +import ( + "errors" +) + +var ( + // ErrNotFound returned when object header should exist in primary index but + // it is not present there. + ErrNotFound = errors.New("address not found") + + // ErrAlreadyRemoved returned when object has tombstone in graveyard. + ErrAlreadyRemoved = errors.New("object already removed") +) diff --git a/pkg/local_object_storage/metabase/v2/exists.go b/pkg/local_object_storage/metabase/v2/exists.go new file mode 100644 index 00000000..c1fb3832 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/exists.go @@ -0,0 +1,43 @@ +package meta + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" +) + +// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it +// returns true if addr is in primary index or false if it is not. +func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + // check graveyard first + if inGraveyard(tx, addr) { + return ErrAlreadyRemoved + } + + // if graveyard is empty, then check if object exists in primary bucket + primaryBucket := tx.Bucket(primaryBucketName(addr.ContainerID())) + if primaryBucket == nil { + return nil + } + + // using `get` as `exists`: https://github.com/boltdb/bolt/issues/321 + val := primaryBucket.Get(objectKey(addr.ObjectID())) + exists = len(val) != 0 + + return nil + }) + + return exists, err +} + +// inGraveyard returns true if object was marked as removed. +func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool { + graveyard := tx.Bucket(graveyardBucketName) + if graveyard == nil { + return false + } + + tombstone := graveyard.Get(addressKey(addr)) + + return len(tombstone) != 0 +} diff --git a/pkg/local_object_storage/metabase/v2/exists_test.go b/pkg/local_object_storage/metabase/v2/exists_test.go new file mode 100644 index 00000000..5ca143af --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/exists_test.go @@ -0,0 +1,29 @@ +package meta_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/stretchr/testify/require" +) + +func TestDB_Exists(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + raw := generateRawObject(t) + addAttribute(raw, "foo", "bar") + + obj := object.NewFromV2(raw.ToV2()) + + exists, err := db.Exists(obj.Address()) + require.NoError(t, err) + require.False(t, exists) + + err = db.Put(obj, nil) + require.NoError(t, err) + + exists, err = db.Exists(obj.Address()) + require.NoError(t, err) + require.True(t, exists) +} diff --git a/pkg/local_object_storage/metabase/v2/get.go b/pkg/local_object_storage/metabase/v2/get.go new file mode 100644 index 00000000..d171b037 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/get.go @@ -0,0 +1,57 @@ +package meta + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "go.etcd.io/bbolt" +) + +// Get returns object header for specified address. +func (db *DB) Get(addr *objectSDK.Address) (obj *object.Object, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + obj, err = db.get(tx, addr) + + return err + }) + + return obj, err +} + +func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, err error) { + obj = object.New() + key := objectKey(addr.ObjectID()) + cid := addr.ContainerID() + + if inGraveyard(tx, addr) { + return nil, ErrAlreadyRemoved + } + + // check in primary index + data := getFromBucket(tx, primaryBucketName(cid), key) + if len(data) != 0 { + return obj, obj.Unmarshal(data) + } + + // if not found then check in tombstone index + data = getFromBucket(tx, tombstoneBucketName(cid), key) + if len(data) != 0 { + return obj, obj.Unmarshal(data) + } + + // if not found then check in storage group index + data = getFromBucket(tx, storageGroupBucketName(cid), key) + if len(data) != 0 { + return obj, obj.Unmarshal(data) + } + + return nil, ErrNotFound +} + +func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { + bkt := tx.Bucket(name) + if bkt == nil { + return nil + } + + return bkt.Get(key) +} diff --git a/pkg/local_object_storage/metabase/v2/get_test.go b/pkg/local_object_storage/metabase/v2/get_test.go new file mode 100644 index 00000000..1154fdd0 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/get_test.go @@ -0,0 +1,63 @@ +package meta_test + +import ( + "testing" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/stretchr/testify/require" +) + +func TestDB_Get(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + raw := generateRawObject(t) + addAttribute(raw, "foo", "bar") + + t.Run("object not found", func(t *testing.T) { + obj := object.NewFromV2(raw.ToV2()) + + _, err := db.Get(obj.Address()) + require.Error(t, err) + }) + + t.Run("put regular object", func(t *testing.T) { + obj := object.NewFromV2(raw.ToV2()) + + err := db.Put(obj, nil) + require.NoError(t, err) + + newObj, err := db.Get(obj.Address()) + require.NoError(t, err) + require.Equal(t, obj, newObj) + }) + + t.Run("put tombstone object", func(t *testing.T) { + raw.SetType(objectSDK.TypeTombstone) + raw.SetID(testOID()) + + obj := object.NewFromV2(raw.ToV2()) + + err := db.Put(obj, nil) + require.NoError(t, err) + + newObj, err := db.Get(obj.Address()) + require.NoError(t, err) + require.Equal(t, obj, newObj) + }) + + t.Run("put storage group object", func(t *testing.T) { + raw.SetType(objectSDK.TypeStorageGroup) + raw.SetID(testOID()) + + obj := object.NewFromV2(raw.ToV2()) + + err := db.Put(obj, nil) + require.NoError(t, err) + + newObj, err := db.Get(obj.Address()) + require.NoError(t, err) + require.Equal(t, obj, newObj) + }) +} diff --git a/pkg/local_object_storage/metabase/v2/info.go b/pkg/local_object_storage/metabase/v2/info.go new file mode 100644 index 00000000..906e2120 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/info.go @@ -0,0 +1,12 @@ +package meta + +// Info groups the information about DB. +type Info struct { + // Full path to the metabase. + Path string +} + +// DumpInfo returns information about the DB. +func (db *DB) DumpInfo() Info { + return db.info +} diff --git a/pkg/local_object_storage/metabase/v2/inhume.go b/pkg/local_object_storage/metabase/v2/inhume.go new file mode 100644 index 00000000..7d00df6a --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/inhume.go @@ -0,0 +1,19 @@ +package meta + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" +) + +// Inhume marks objects as removed but not removes it from metabase. +func (db *DB) Inhume(target, tombstone *objectSDK.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + graveyard, err := tx.CreateBucketIfNotExists(graveyardBucketName) + if err != nil { + return err + } + + // consider checking if target is already in graveyard? + return graveyard.Put(addressKey(target), addressKey(tombstone)) + }) +} diff --git a/pkg/local_object_storage/metabase/v2/inhume_test.go b/pkg/local_object_storage/metabase/v2/inhume_test.go new file mode 100644 index 00000000..101a3fd0 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/inhume_test.go @@ -0,0 +1,32 @@ +package meta_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase/v2" + "github.com/stretchr/testify/require" +) + +func TestDB_Inhume(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + raw := generateRawObject(t) + addAttribute(raw, "foo", "bar") + + obj := object.NewFromV2(raw.ToV2()) + tombstoneID := generateAddress() + + err := db.Put(obj, nil) + require.NoError(t, err) + + err = db.Inhume(obj.Address(), tombstoneID) + require.NoError(t, err) + + _, err = db.Exists(obj.Address()) + require.EqualError(t, err, meta.ErrAlreadyRemoved.Error()) + + _, err = db.Get(obj.Address()) + require.EqualError(t, err, meta.ErrAlreadyRemoved.Error()) +} diff --git a/pkg/local_object_storage/metabase/v2/movable.go b/pkg/local_object_storage/metabase/v2/movable.go new file mode 100644 index 00000000..0cecb094 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/movable.go @@ -0,0 +1,73 @@ +package meta + +import ( + "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" +) + +// ToMoveIt marks objects to move it into another shard. This useful for +// faster HRW fetching. +func (db *DB) ToMoveIt(addr *objectSDK.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + toMoveIt, err := tx.CreateBucketIfNotExists(toMoveItBucketName) + if err != nil { + return err + } + + return toMoveIt.Put(addressKey(addr), zeroValue) + }) +} + +// DoNotMove removes `MoveIt` mark from the object. +func (db *DB) DoNotMove(addr *objectSDK.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + toMoveIt := tx.Bucket(toMoveItBucketName) + if toMoveIt == nil { + return nil + } + + return toMoveIt.Delete(addressKey(addr)) + }) +} + +// Movable returns list of marked objects to move into other shard. +func (db *DB) Movable() ([]*objectSDK.Address, error) { + var strAddrs []string + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + toMoveIt := tx.Bucket(toMoveItBucketName) + if toMoveIt == nil { + return nil + } + + return toMoveIt.ForEach(func(k, v []byte) error { + strAddrs = append(strAddrs, string(k)) + + return nil + }) + }) + if err != nil { + return nil, err + } + + // we can parse strings to structures in-place, but probably it seems + // more efficient to keep bolt db TX code smaller because it might be + // bottleneck. + addrs := make([]*objectSDK.Address, 0, len(strAddrs)) + + for i := range strAddrs { + addr := objectSDK.NewAddress() + + err = addr.Parse(strAddrs[i]) + if err != nil { + return nil, fmt.Errorf("can't parse object address %v: %w", + strAddrs[i], err) + } + + addrs = append(addrs, addr) + } + + return addrs, nil +} diff --git a/pkg/local_object_storage/metabase/v2/movable_test.go b/pkg/local_object_storage/metabase/v2/movable_test.go new file mode 100644 index 00000000..70a09c51 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/movable_test.go @@ -0,0 +1,59 @@ +package meta_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/stretchr/testify/require" +) + +func TestDB_Movable(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + raw1 := generateRawObject(t) + raw2 := generateRawObject(t) + + obj1 := object.NewFromV2(raw1.ToV2()) + obj2 := object.NewFromV2(raw2.ToV2()) + + // put two objects in metabase + err := db.Put(obj1, nil) + require.NoError(t, err) + + err = db.Put(obj2, nil) + require.NoError(t, err) + + // check if toMoveIt index empty + toMoveList, err := db.Movable() + require.NoError(t, err) + require.Len(t, toMoveList, 0) + + // mark to move object2 + err = db.ToMoveIt(obj2.Address()) + require.NoError(t, err) + + // check if toMoveIt index contains address of object 2 + toMoveList, err = db.Movable() + require.NoError(t, err) + require.Len(t, toMoveList, 1) + require.Contains(t, toMoveList, obj2.Address()) + + // remove from toMoveIt index non existing address + err = db.DoNotMove(obj1.Address()) + require.NoError(t, err) + + // check if toMoveIt index hasn't changed + toMoveList, err = db.Movable() + require.NoError(t, err) + require.Len(t, toMoveList, 1) + + // remove from toMoveIt index existing address + err = db.DoNotMove(obj2.Address()) + require.NoError(t, err) + + // check if toMoveIt index is empty now + toMoveList, err = db.Movable() + require.NoError(t, err) + require.Len(t, toMoveList, 0) +} diff --git a/pkg/local_object_storage/metabase/v2/put.go b/pkg/local_object_storage/metabase/v2/put.go new file mode 100644 index 00000000..8718b47c --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/put.go @@ -0,0 +1,268 @@ +package meta + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "go.etcd.io/bbolt" +) + +type ( + namedBucketItem struct { + name, key, val []byte + } +) + +var ErrUnknownObjectType = errors.New("unknown object type") + +// Put saves object header in metabase. Object payload expected to be cut. +// Big objects have nil blobovniczaID. +func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error { + var isParent bool // true when object header obtained from `split.Parent` + + for ; obj != nil; obj, isParent = obj.GetParent(), true { + exists, err := db.Exists(obj.Address()) + if err != nil { + return err + } + + // most right child and split header overlap parent so we have to + // check if object exists to not overwrite it twice + if exists { + continue + } + + uniqueIndexes, err := uniqueIndexes(obj, isParent, id) + if err != nil { + return fmt.Errorf("can' build unique indexes: %w", err) + } + + // build list indexes + listIndexes, err := listIndexes(obj) + if err != nil { + return fmt.Errorf("can' build list indexes: %w", err) + } + + fkbtIndexes, err := fkbtIndexes(obj) + if err != nil { + return fmt.Errorf("can' build fake bucket tree indexes: %w", err) + } + + // consider making one TX for both target object and parent + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + // put unique indexes + for i := range uniqueIndexes { + err := putUniqueIndexItem(tx, uniqueIndexes[i]) + if err != nil { + return err + } + } + + // put list indexes + for i := range listIndexes { + err := putListIndexItem(tx, listIndexes[i]) + if err != nil { + return err + } + } + + // put fake bucket tree indexes + for i := range fkbtIndexes { + err := putFKBTIndexItem(tx, fkbtIndexes[i]) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { // if tx failed then return error + return err + } + } + + return nil +} + +// builds list of indexes from the object. +func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) { + addr := obj.Address() + objKey := objectKey(addr.ObjectID()) + result := make([]namedBucketItem, 0, 2) + + // add value to primary unique bucket + if !isParent { + var bucketName []byte + + switch obj.Type() { + case objectSDK.TypeRegular: + bucketName = primaryBucketName(addr.ContainerID()) + case objectSDK.TypeTombstone: + bucketName = tombstoneBucketName(addr.ContainerID()) + case objectSDK.TypeStorageGroup: + bucketName = storageGroupBucketName(addr.ContainerID()) + default: + return nil, ErrUnknownObjectType + } + + rawObject, err := obj.Marshal() + if err != nil { + return nil, fmt.Errorf("can't marshal object header: %w", err) + } + + result = append(result, namedBucketItem{ + name: bucketName, + key: objKey, + val: rawObject, + }) + + // index blobovniczaID if it is present + if id != nil { + result = append(result, namedBucketItem{ + name: smallBucketName(addr.ContainerID()), + key: objKey, + val: *id, + }) + } + } + + // index root object + if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { + result = append(result, namedBucketItem{ + name: rootBucketName(addr.ContainerID()), + key: objKey, + val: zeroValue, // todo: store split.Info when it will be ready + }) + } + + return result, nil +} + +// builds list of indexes from the object. +func listIndexes(obj *object.Object) ([]namedBucketItem, error) { + result := make([]namedBucketItem, 0, 1) + addr := obj.Address() + objKey := objectKey(addr.ObjectID()) + + // index payload hashes + result = append(result, namedBucketItem{ + name: payloadHashBucketName(addr.ContainerID()), + key: obj.PayloadChecksum().Sum(), + val: objKey, + }) + + if obj.ParentID() != nil { + result = append(result, namedBucketItem{ + name: parentBucketName(addr.ContainerID()), + key: objectKey(obj.ParentID()), + val: objKey, + }) + } + + // todo: index splitID + + return result, nil +} + +// builds list of indexes from the object. +func fkbtIndexes(obj *object.Object) ([]namedBucketItem, error) { + addr := obj.Address() + objKey := []byte(addr.ObjectID().String()) + + attrs := obj.Attributes() + result := make([]namedBucketItem, 0, 1+len(attrs)) + + // owner + result = append(result, namedBucketItem{ + name: ownerBucketName(addr.ContainerID()), + key: []byte(obj.OwnerID().String()), + val: objKey, + }) + + // user specified attributes + for i := range attrs { + result = append(result, namedBucketItem{ + name: attributeBucketName(addr.ContainerID(), attrs[i].Key()), + key: []byte(attrs[i].Value()), + val: objKey, + }) + } + + return result, nil +} + +func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + bkt, err := tx.CreateBucketIfNotExists(item.name) + if err != nil { + return fmt.Errorf("can't create index %v: %w", item.name, err) + } + + return bkt.Put(item.key, item.val) +} + +func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + bkt, err := tx.CreateBucketIfNotExists(item.name) + if err != nil { + return fmt.Errorf("can't create index %v: %w", item.name, err) + } + + fkbtRoot, err := bkt.CreateBucketIfNotExists(item.key) + if err != nil { + return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err) + } + + return fkbtRoot.Put(item.val, zeroValue) +} + +func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + bkt, err := tx.CreateBucketIfNotExists(item.name) + if err != nil { + return fmt.Errorf("can't create index %v: %w", item.name, err) + } + + lst, err := decodeList(bkt.Get(item.key)) + if err != nil { + return fmt.Errorf("can't decode leaf list %v: %w", item.key, err) + } + + lst = append(lst, item.val) + + encodedLst, err := encodeList(lst) + if err != nil { + return fmt.Errorf("can't encode leaf list %v: %w", item.key, err) + } + + return bkt.Put(item.key, encodedLst) +} + +// encodeList decodes list of bytes into a single blog for list bucket indexes. +func encodeList(lst [][]byte) ([]byte, error) { + buf := bytes.NewBuffer(nil) + encoder := gob.NewEncoder(buf) + + // consider using protobuf encoding instead of glob + if err := encoder.Encode(lst); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// decodeList decodes blob into the list of bytes from list bucket index. +func decodeList(data []byte) (lst [][]byte, err error) { + if len(data) == 0 { + return nil, nil + } + + decoder := gob.NewDecoder(bytes.NewReader(data)) + if err := decoder.Decode(&lst); err != nil { + return nil, err + } + + return lst, nil +} diff --git a/pkg/local_object_storage/metabase/v2/select.go b/pkg/local_object_storage/metabase/v2/select.go new file mode 100644 index 00000000..d549addb --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/select.go @@ -0,0 +1,331 @@ +package meta + +import ( + "encoding/binary" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type ( + // filterGroup is a structure that have search filters grouped by access + // method. We have fast filters that looks for indexes and do not unmarshal + // objects, and slow filters, that applied after fast filters created + // smaller set of objects to check. + filterGroup struct { + cid *container.ID + fastFilters object.SearchFilters + slowFilters object.SearchFilters + } +) + +var ErrContainerNotInQuery = errors.New("search query does not contain container id filter") + +// Select returns list of addresses of objects that match search filters. +func (db *DB) Select(fs object.SearchFilters) (res []*object.Address, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + res, err = db.selectObjects(tx, fs) + + return err + }) + + return res, err +} + +func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) { + group, err := groupFilters(fs) + if err != nil { + return nil, err + } + + if group.cid == nil { + return nil, ErrContainerNotInQuery + } + + // keep matched addresses in this cache + // value equal to number (index+1) of latest matched filter + mAddr := make(map[string]int) + + expLen := len(group.fastFilters) // expected value of matched filters in mAddr + + if len(group.fastFilters) == 0 { + expLen = 1 + + db.selectAll(tx, group.cid, mAddr) + } else { + for i := range group.fastFilters { + db.selectFastFilter(tx, group.cid, group.fastFilters[i], mAddr, i) + } + } + + res := make([]*object.Address, 0, len(mAddr)) + + for a, ind := range mAddr { + if ind != expLen { + continue // ignore objects with unmatched fast filters + } + + addr := object.NewAddress() + if err := addr.Parse(a); err != nil { + // TODO: storage was broken, so we need to handle it + return nil, err + } + + if inGraveyard(tx, addr) { + continue // ignore removed objects + } + + if !db.matchSlowFilters(tx, addr, group.slowFilters) { + continue // ignore objects with unmatched slow filters + } + + res = append(res, addr) + } + + return res, nil +} + +// selectAll adds to resulting cache all available objects in metabase. +func (db *DB) selectAll(tx *bbolt.Tx, cid *container.ID, to map[string]int) { + prefix := cid.String() + "/" + + selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, 0) + selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, 0) + selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, 0) + selectAllFromBucket(tx, parentBucketName(cid), prefix, to, 0) +} + +// selectAllFromBucket goes through all keys in bucket and adds them in a +// resulting cache. Keys should be stringed object ids. +func selectAllFromBucket(tx *bbolt.Tx, name []byte, prefix string, to map[string]int, fNum int) { + bkt := tx.Bucket(name) + if bkt == nil { + return + } + + _ = bkt.ForEach(func(k, v []byte) error { + key := prefix + string(k) // consider using string builders from sync.Pool + markAddressInCache(to, fNum, key) + + return nil + }) +} + +// selectFastFilter makes fast optimized checks for well known buckets or +// looking through user attribute buckets otherwise. +func (db *DB) selectFastFilter( + tx *bbolt.Tx, + cid *container.ID, // container we search on + f object.SearchFilter, // fast filter + to map[string]int, // resulting cache + fNum int, // index of filter +) { + prefix := cid.String() + "/" + + // todo: add splitID + + switch f.Header() { + case v2object.FilterHeaderObjectID: + // todo: implement me + case v2object.FilterHeaderOwnerID: + bucketName := ownerBucketName(cid) + db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) + case v2object.FilterHeaderPayloadHash: + bucketName := payloadHashBucketName(cid) + db.selectFromList(tx, bucketName, f, prefix, to, fNum) + case v2object.FilterHeaderObjectType: + var bucketName []byte + + switch f.Value() { // do it better after https://github.com/nspcc-dev/neofs-api/issues/84 + case "Regular": + bucketName = primaryBucketName(cid) + + selectAllFromBucket(tx, bucketName, prefix, to, fNum) + + bucketName = parentBucketName(cid) + case "Tombstone": + bucketName = tombstoneBucketName(cid) + case "StorageGroup": + bucketName = storageGroupBucketName(cid) + default: + db.log.Debug("unknown object type", zap.String("type", f.Value())) + } + + selectAllFromBucket(tx, bucketName, prefix, to, fNum) + case v2object.FilterHeaderParent: + bucketName := parentBucketName(cid) + db.selectFromList(tx, bucketName, f, prefix, to, fNum) + case v2object.FilterPropertyRoot: + selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum) + case v2object.FilterPropertyPhy: + selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, fNum) + selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, fNum) + selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum) + default: // user attribute + bucketName := attributeBucketName(cid, f.Header()) + db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) + } +} + +// selectFromList looks into index to find list of addresses to add in +// resulting cache. +func (db *DB) selectFromFKBT( + tx *bbolt.Tx, + name []byte, // fkbt root bucket name + f object.SearchFilter, // filter for operation and value + prefix string, // prefix to create addr from oid in index + to map[string]int, // resulting cache + fNum int, // index of filter +) { // + matchFunc, ok := db.matchers[f.Operation()] + if !ok { + db.log.Debug("missing matcher", zap.Uint32("operation", uint32(f.Operation()))) + + return + } + + fkbtRoot := tx.Bucket(name) + if fkbtRoot == nil { + return + } + + err := fkbtRoot.ForEach(func(k, _ []byte) error { + if matchFunc(f.Header(), k, f.Value()) { + fkbtLeaf := fkbtRoot.Bucket(k) + if fkbtLeaf == nil { + return nil + } + + return fkbtLeaf.ForEach(func(k, _ []byte) error { + addr := prefix + string(k) + markAddressInCache(to, fNum, addr) + + return nil + }) + } + + return nil + }) + if err != nil { + db.log.Debug("error in FKBT selection", zap.String("error", err.Error())) + } +} + +// selectFromList looks into index to find list of addresses to add in +// resulting cache. +func (db *DB) selectFromList( + tx *bbolt.Tx, + name []byte, // list root bucket name + f object.SearchFilter, // filter for operation and value + prefix string, // prefix to create addr from oid in index + to map[string]int, // resulting cache + fNum int, // index of filter +) { // + bkt := tx.Bucket(name) + if bkt == nil { + return + } + + switch f.Operation() { + case object.MatchStringEqual: + default: + db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation()))) + } + + // warning: it works only for MatchStringEQ, for NotEQ you should iterate over + // bkt and apply matchFunc, don't forget to implement this when it will be + // needed. Right now it is not efficient to iterate over bucket + // when there is only MatchStringEQ. + lst, err := decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value()))) + if err != nil { + db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error())) + } + + for i := range lst { + addr := prefix + string(lst[i]) + markAddressInCache(to, fNum, addr) + } +} + +// matchSlowFilters return true if object header is matched by all slow filters. +func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.SearchFilters) bool { + if len(f) == 0 { + return true + } + + obj, err := db.get(tx, addr) + if err != nil { + return false + } + + for i := range f { + matchFunc, ok := db.matchers[f[i].Operation()] + if !ok { + return false + } + + var data []byte + + switch f[i].Header() { + case v2object.FilterHeaderVersion: + data = []byte(obj.Version().String()) + case v2object.FilterHeaderHomomorphicHash: + data = obj.PayloadHomomorphicHash().Sum() + case v2object.FilterHeaderCreationEpoch: + data = make([]byte, 8) + binary.LittleEndian.PutUint64(data, obj.CreationEpoch()) + case v2object.FilterHeaderPayloadLength: + data = make([]byte, 8) + binary.LittleEndian.PutUint64(data, obj.PayloadSize()) + } + + if !matchFunc(f[i].Header(), data, f[i].Value()) { + return false + } + } + + return true +} + +// groupFilters divides filters in two groups: fast and slow. Fast filters +// processed by indexes and slow filters processed after by unmarshaling +// object headers. +func groupFilters(filters object.SearchFilters) (*filterGroup, error) { + res := &filterGroup{ + fastFilters: make(object.SearchFilters, 0, len(filters)), + slowFilters: make(object.SearchFilters, 0, len(filters)), + } + + for i := range filters { + switch filters[i].Header() { + case v2object.FilterHeaderContainerID: + res.cid = container.NewID() + + err := res.cid.Parse(filters[i].Value()) + if err != nil { + return nil, fmt.Errorf("can't parse container id: %w", err) + } + case // slow filters + v2object.FilterHeaderVersion, + v2object.FilterHeaderCreationEpoch, + v2object.FilterHeaderPayloadLength, + v2object.FilterHeaderHomomorphicHash: + res.slowFilters = append(res.slowFilters, filters[i]) + default: // fast filters or user attributes if unknown + res.fastFilters = append(res.fastFilters, filters[i]) + } + } + + return res, nil +} + +func markAddressInCache(cache map[string]int, fNum int, addr string) { + if num := cache[addr]; num == fNum { + cache[addr] = num + 1 + } +} diff --git a/pkg/local_object_storage/metabase/v2/select_test.go b/pkg/local_object_storage/metabase/v2/select_test.go new file mode 100644 index 00000000..6526d351 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/select_test.go @@ -0,0 +1,302 @@ +package meta_test + +import ( + "encoding/hex" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/stretchr/testify/require" +) + +func TestDB_SelectUserAttributes(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + raw1 := generateRawObjectWithCID(t, cid) + addAttribute(raw1, "foo", "bar") + addAttribute(raw1, "x", "y") + + err := db.Put(raw1.Object(), nil) + require.NoError(t, err) + + raw2 := generateRawObjectWithCID(t, cid) + addAttribute(raw2, "foo", "bar") + addAttribute(raw2, "x", "z") + + err = db.Put(raw2.Object(), nil) + require.NoError(t, err) + + raw3 := generateRawObjectWithCID(t, cid) + addAttribute(raw3, "a", "b") + + err = db.Put(raw3.Object(), nil) + require.NoError(t, err) + + fs := generateSearchFilter(cid) + fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual) + testSelect(t, db, fs, + raw1.Object().Address(), + raw2.Object().Address(), + ) + + fs = generateSearchFilter(cid) + fs.AddFilter("x", "y", objectSDK.MatchStringEqual) + testSelect(t, db, fs, raw1.Object().Address()) + + fs = generateSearchFilter(cid) + fs.AddFilter("a", "b", objectSDK.MatchStringEqual) + testSelect(t, db, fs, raw3.Object().Address()) + + fs = generateSearchFilter(cid) + fs.AddFilter("c", "d", objectSDK.MatchStringEqual) + testSelect(t, db, fs) + + fs = generateSearchFilter(cid) + testSelect(t, db, fs, + raw1.Object().Address(), + raw2.Object().Address(), + raw3.Object().Address(), + ) +} + +func TestDB_SelectRootPhyParent(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + // prepare + + small := generateRawObjectWithCID(t, cid) + err := db.Put(small.Object(), nil) + require.NoError(t, err) + + ts := generateRawObjectWithCID(t, cid) + ts.SetType(objectSDK.TypeTombstone) + err = db.Put(ts.Object(), nil) + require.NoError(t, err) + + sg := generateRawObjectWithCID(t, cid) + sg.SetType(objectSDK.TypeStorageGroup) + err = db.Put(sg.Object(), nil) + require.NoError(t, err) + + leftChild := generateRawObjectWithCID(t, cid) + leftChild.InitRelations() + err = db.Put(leftChild.Object(), nil) + require.NoError(t, err) + + parent := generateRawObjectWithCID(t, cid) + + rightChild := generateRawObjectWithCID(t, cid) + rightChild.SetParent(parent.Object().SDK()) + rightChild.SetParentID(parent.Object().Address().ObjectID()) + err = db.Put(rightChild.Object(), nil) + require.NoError(t, err) + + link := generateRawObjectWithCID(t, cid) + link.SetParent(parent.Object().SDK()) + link.SetParentID(parent.Object().Address().ObjectID()) + link.SetChildren(leftChild.Object().Address().ObjectID(), rightChild.Object().Address().ObjectID()) + + err = db.Put(link.Object(), nil) + require.NoError(t, err) + + // printDB(meta.ExtractDB(db)) + // return + + t.Run("root objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddRootFilter() + testSelect(t, db, fs, + small.Object().Address(), + parent.Object().Address(), + ) + }) + + t.Run("phy objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddPhyFilter() + testSelect(t, db, fs, + small.Object().Address(), + ts.Object().Address(), + sg.Object().Address(), + leftChild.Object().Address(), + rightChild.Object().Address(), + link.Object().Address(), + ) + }) + + t.Run("regular objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderObjectType, "Regular", objectSDK.MatchStringEqual) + testSelect(t, db, fs, + small.Object().Address(), + leftChild.Object().Address(), + rightChild.Object().Address(), + link.Object().Address(), + parent.Object().Address(), + ) + }) + + t.Run("tombstone objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderObjectType, "Tombstone", objectSDK.MatchStringEqual) + testSelect(t, db, fs, ts.Object().Address()) + }) + + t.Run("storage group objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderObjectType, "StorageGroup", objectSDK.MatchStringEqual) + testSelect(t, db, fs, sg.Object().Address()) + }) + + t.Run("objects with parent", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderParent, + parent.Object().Address().ObjectID().String(), + objectSDK.MatchStringEqual) + + testSelect(t, db, fs, + rightChild.Object().Address(), + link.Object().Address(), + ) + }) + + t.Run("all objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + testSelect(t, db, fs, + small.Object().Address(), + ts.Object().Address(), + sg.Object().Address(), + leftChild.Object().Address(), + rightChild.Object().Address(), + link.Object().Address(), + parent.Object().Address(), + ) + }) +} + +func TestDB_SelectInhume(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + raw1 := generateRawObjectWithCID(t, cid) + err := db.Put(raw1.Object(), nil) + require.NoError(t, err) + + raw2 := generateRawObjectWithCID(t, cid) + err = db.Put(raw2.Object(), nil) + require.NoError(t, err) + + fs := generateSearchFilter(cid) + testSelect(t, db, fs, + raw1.Object().Address(), + raw2.Object().Address(), + ) + + tombstone := objectSDK.NewAddress() + tombstone.SetContainerID(cid) + tombstone.SetObjectID(testOID()) + + err = db.Inhume(raw2.Object().Address(), tombstone) + require.NoError(t, err) + + fs = generateSearchFilter(cid) + testSelect(t, db, fs, + raw1.Object().Address(), + ) +} + +func TestDB_SelectPayloadHash(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + raw1 := generateRawObjectWithCID(t, cid) + err := db.Put(raw1.Object(), nil) + require.NoError(t, err) + + raw2 := generateRawObjectWithCID(t, cid) + err = db.Put(raw2.Object(), nil) + require.NoError(t, err) + + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderPayloadHash, + hex.EncodeToString(raw1.PayloadChecksum().Sum()), + objectSDK.MatchStringEqual) + + testSelect(t, db, fs, raw1.Object().Address()) +} + +func TestDB_SelectWithSlowFilters(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + v20 := new(pkg.Version) + v20.SetMajor(2) + + v21 := new(pkg.Version) + v21.SetMajor(2) + v21.SetMinor(1) + + raw1 := generateRawObjectWithCID(t, cid) + raw1.SetPayloadSize(10) + raw1.SetCreationEpoch(11) + raw1.SetVersion(v20) + err := db.Put(raw1.Object(), nil) + require.NoError(t, err) + + raw2 := generateRawObjectWithCID(t, cid) + raw2.SetPayloadSize(20) + raw2.SetCreationEpoch(21) + raw2.SetVersion(v21) + err = db.Put(raw2.Object(), nil) + require.NoError(t, err) + + t.Run("object with TZHash", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderHomomorphicHash, + hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()), + objectSDK.MatchStringEqual) + + testSelect(t, db, fs, raw1.Object().Address()) + }) + + t.Run("object with payload length", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual) + + testSelect(t, db, fs, raw2.Object().Address()) + }) + + t.Run("object with creation epoch", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual) + + testSelect(t, db, fs, raw1.Object().Address()) + }) + + t.Run("object with version", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21) + testSelect(t, db, fs, raw2.Object().Address()) + }) +} + +func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters { + fs := objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid) + + return fs +} diff --git a/pkg/local_object_storage/metabase/v2/small.go b/pkg/local_object_storage/metabase/v2/small.go new file mode 100644 index 00000000..0414138d --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/small.go @@ -0,0 +1,28 @@ +package meta + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "go.etcd.io/bbolt" +) + +// IsSmall returns blobovniczaID for small objects and nil for big objects. +// Small objects stored in blobovnicza instances. Big objects stored in FS by +// shallow path which is calculated from address and therefore it is not +// indexed in metabase. +func (db *DB) IsSmall(addr *objectSDK.Address) (id *blobovnicza.ID, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + // if graveyard is empty, then check if object exists in primary bucket + smallBucket := tx.Bucket(smallBucketName(addr.ContainerID())) + if smallBucket == nil { + return nil + } + + blobovniczaID := smallBucket.Get(objectKey(addr.ObjectID())) + id = blobovnicza.NewIDFromBytes(blobovniczaID) + + return err + }) + + return id, err +} diff --git a/pkg/local_object_storage/metabase/v2/small_test.go b/pkg/local_object_storage/metabase/v2/small_test.go new file mode 100644 index 00000000..4f92a5c9 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/small_test.go @@ -0,0 +1,45 @@ +package meta_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/stretchr/testify/require" +) + +func TestDB_IsSmall(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + raw1 := generateRawObject(t) + raw2 := generateRawObject(t) + + blobovniczaID := blobovnicza.ID{1, 2, 3, 4} + + obj1 := object.NewFromV2(raw1.ToV2()) + obj2 := object.NewFromV2(raw2.ToV2()) + + // check IsSmall from empty database + fetchedBlobovniczaID, err := db.IsSmall(obj1.Address()) + require.NoError(t, err) + require.Nil(t, fetchedBlobovniczaID) + + // put one object with blobovniczaID + err = db.Put(obj1, &blobovniczaID) + require.NoError(t, err) + + // put one object without blobovniczaID + err = db.Put(obj2, nil) + require.NoError(t, err) + + // check IsSmall for object without blobovniczaID + fetchedBlobovniczaID, err = db.IsSmall(obj2.Address()) + require.NoError(t, err) + require.Nil(t, fetchedBlobovniczaID) + + // check IsSmall for object with blobovniczaID + fetchedBlobovniczaID, err = db.IsSmall(obj1.Address()) + require.NoError(t, err) + require.Equal(t, blobovniczaID, *fetchedBlobovniczaID) +} diff --git a/pkg/local_object_storage/metabase/v2/util.go b/pkg/local_object_storage/metabase/v2/util.go new file mode 100644 index 00000000..57169600 --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/util.go @@ -0,0 +1,92 @@ +package meta + +import ( + "strings" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +/* +We might increase performance by not using string representation of +identities and addresses. String representation require base58 encoding that +slows execution. Instead we can try to marshal these structures directly into +bytes. Check it later. +*/ + +var ( + graveyardBucketName = []byte("Graveyard") + toMoveItBucketName = []byte("ToMoveIt") + + zeroValue = []byte{0xFF} + + smallPostfix = "_small" + storageGroupPostfix = "_SG" + tombstonePostfix = "_TS" + ownerPostfix = "_ownerid" + payloadHashPostfix = "_payloadhash" + rootPostfix = "_root" + parentPostfix = "_parent" + + userAttributePostfix = "_attr_" +) + +// primaryBucketName returns . +func primaryBucketName(cid *container.ID) []byte { + return []byte(cid.String()) +} + +// tombstoneBucketName returns _TS. +func tombstoneBucketName(cid *container.ID) []byte { + return []byte(cid.String() + tombstonePostfix) +} + +// storageGroupBucketName returns _SG. +func storageGroupBucketName(cid *container.ID) []byte { + return []byte(cid.String() + storageGroupPostfix) +} + +// smallBucketName returns _small. +func smallBucketName(cid *container.ID) []byte { + return []byte(cid.String() + smallPostfix) // consider caching output values +} + +// attributeBucketName returns _attr_. +func attributeBucketName(cid *container.ID, attributeKey string) []byte { + sb := strings.Builder{} // consider getting string builders from sync.Pool + sb.WriteString(cid.String()) + sb.WriteString(userAttributePostfix) + sb.WriteString(attributeKey) + + return []byte(sb.String()) +} + +// payloadHashBucketName returns _payloadhash. +func payloadHashBucketName(cid *container.ID) []byte { + return []byte(cid.String() + payloadHashPostfix) +} + +// rootBucketName returns _root. +func rootBucketName(cid *container.ID) []byte { + return []byte(cid.String() + rootPostfix) +} + +// ownerBucketName returns _ownerid. +func ownerBucketName(cid *container.ID) []byte { + return []byte(cid.String() + ownerPostfix) +} + +// parentBucketNAme returns _parent. +func parentBucketName(cid *container.ID) []byte { + return []byte(cid.String() + parentPostfix) +} + +// addressKey returns key for K-V tables when key is a whole address. +func addressKey(addr *object.Address) []byte { + return []byte(addr.String()) +} + +// objectKey returns key for K-V tables when key is an object id. +func objectKey(oid *object.ID) []byte { + return []byte(oid.String()) +}