forked from TrueCloudLab/frostfs-node
[#199] Refactor metabase internal structure
Accoring to MetaBase-Plan-B storage engine specification. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
1c81d507fd
commit
e478c0d024
18 changed files with 1686 additions and 0 deletions
106
pkg/local_object_storage/metabase/v2/db.go
Normal file
106
pkg/local_object_storage/metabase/v2/db.go
Normal file
|
@ -0,0 +1,106 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"strconv"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DB represents local metabase of storage node.
|
||||
type DB struct {
|
||||
info Info
|
||||
|
||||
*cfg
|
||||
|
||||
matchers map[object.SearchMatchType]func(string, []byte, string) bool
|
||||
}
|
||||
|
||||
// Option is an option of DB constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
boltDB *bbolt.DB
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// NewDB creates, initializes and returns DB instance.
|
||||
func NewDB(opts ...Option) *DB {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
return &DB{
|
||||
info: Info{
|
||||
Path: c.boltDB.Path(),
|
||||
},
|
||||
cfg: c,
|
||||
matchers: map[object.SearchMatchType]func(string, []byte, string) bool{
|
||||
object.MatchUnknown: unknownMatcher,
|
||||
object.MatchStringEqual: stringEqualMatcher,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) Close() error {
|
||||
return db.boltDB.Close()
|
||||
}
|
||||
|
||||
func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
|
||||
switch key {
|
||||
default:
|
||||
return string(objVal) == filterVal
|
||||
case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash:
|
||||
return hex.EncodeToString(objVal) == filterVal
|
||||
case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength:
|
||||
return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10) == filterVal
|
||||
}
|
||||
}
|
||||
|
||||
func unknownMatcher(_ string, _ []byte, _ string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// bucketKeyHelper returns byte representation of val that is used as a key
|
||||
// in boltDB. Useful for getting filter values from unique and list indexes.
|
||||
func bucketKeyHelper(hdr string, val string) []byte {
|
||||
switch hdr {
|
||||
case v2object.FilterHeaderPayloadHash:
|
||||
v, err := hex.DecodeString(val)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return v
|
||||
default:
|
||||
return []byte(val)
|
||||
}
|
||||
}
|
||||
|
||||
// FromBoltDB returns option to construct DB from BoltDB instance.
|
||||
func FromBoltDB(db *bbolt.DB) Option {
|
||||
return func(c *cfg) {
|
||||
c.boltDB = db
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to set logger of DB.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
}
|
||||
}
|
113
pkg/local_object_storage/metabase/v2/db_test.go
Normal file
113
pkg/local_object_storage/metabase/v2/db_test.go
Normal file
|
@ -0,0 +1,113 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase/v2"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/test"
|
||||
"github.com/nspcc-dev/tzhash/tz"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func testSelect(t *testing.T, db *meta.DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) {
|
||||
res, err := db.Select(fs)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(exp))
|
||||
|
||||
for i := range exp {
|
||||
require.Contains(t, res, exp[i])
|
||||
}
|
||||
}
|
||||
|
||||
func testCID() *container.ID {
|
||||
cs := [sha256.Size]byte{}
|
||||
_, _ = rand.Read(cs[:])
|
||||
|
||||
id := container.NewID()
|
||||
id.SetSHA256(cs)
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func testOID() *objectSDK.ID {
|
||||
cs := [sha256.Size]byte{}
|
||||
_, _ = rand.Read(cs[:])
|
||||
|
||||
id := objectSDK.NewID()
|
||||
id.SetSHA256(cs)
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func newDB(t testing.TB) *meta.DB {
|
||||
path := t.Name()
|
||||
|
||||
bdb, err := bbolt.Open(path, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
return meta.NewDB(meta.FromBoltDB(bdb))
|
||||
}
|
||||
|
||||
func releaseDB(db *meta.DB) {
|
||||
db.Close()
|
||||
os.Remove(db.DumpInfo().Path)
|
||||
}
|
||||
|
||||
func generateRawObject(t *testing.T) *object.RawObject {
|
||||
return generateRawObjectWithCID(t, testCID())
|
||||
}
|
||||
|
||||
func generateRawObjectWithCID(t *testing.T, cid *container.ID) *object.RawObject {
|
||||
version := pkg.NewVersion()
|
||||
version.SetMajor(2)
|
||||
version.SetMinor(1)
|
||||
|
||||
w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
ownerID := owner.NewID()
|
||||
ownerID.SetNeo3Wallet(w)
|
||||
|
||||
csum := new(pkg.Checksum)
|
||||
csum.SetSHA256(sha256.Sum256(w.Bytes()))
|
||||
|
||||
csumTZ := new(pkg.Checksum)
|
||||
csumTZ.SetTillichZemor(tz.Sum(csum.Sum()))
|
||||
|
||||
obj := object.NewRaw()
|
||||
obj.SetID(testOID())
|
||||
obj.SetOwnerID(ownerID)
|
||||
obj.SetContainerID(cid)
|
||||
obj.SetVersion(version)
|
||||
obj.SetPayloadChecksum(csum)
|
||||
obj.SetPayloadHomomorphicHash(csumTZ)
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
func generateAddress() *objectSDK.Address {
|
||||
addr := objectSDK.NewAddress()
|
||||
addr.SetContainerID(testCID())
|
||||
addr.SetObjectID(testOID())
|
||||
|
||||
return addr
|
||||
}
|
||||
|
||||
func addAttribute(obj *object.RawObject, key, val string) {
|
||||
attr := objectSDK.NewAttribute()
|
||||
attr.SetKey(key)
|
||||
attr.SetValue(val)
|
||||
|
||||
attrs := obj.Attributes()
|
||||
attrs = append(attrs, attr)
|
||||
obj.SetAttributes(attrs...)
|
||||
}
|
14
pkg/local_object_storage/metabase/v2/errors.go
Normal file
14
pkg/local_object_storage/metabase/v2/errors.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotFound returned when object header should exist in primary index but
|
||||
// it is not present there.
|
||||
ErrNotFound = errors.New("address not found")
|
||||
|
||||
// ErrAlreadyRemoved returned when object has tombstone in graveyard.
|
||||
ErrAlreadyRemoved = errors.New("object already removed")
|
||||
)
|
43
pkg/local_object_storage/metabase/v2/exists.go
Normal file
43
pkg/local_object_storage/metabase/v2/exists.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Exists returns ErrAlreadyRemoved if addr was marked as removed. Otherwise it
|
||||
// returns true if addr is in primary index or false if it is not.
|
||||
func (db *DB) Exists(addr *objectSDK.Address) (exists bool, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
// check graveyard first
|
||||
if inGraveyard(tx, addr) {
|
||||
return ErrAlreadyRemoved
|
||||
}
|
||||
|
||||
// if graveyard is empty, then check if object exists in primary bucket
|
||||
primaryBucket := tx.Bucket(primaryBucketName(addr.ContainerID()))
|
||||
if primaryBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// using `get` as `exists`: https://github.com/boltdb/bolt/issues/321
|
||||
val := primaryBucket.Get(objectKey(addr.ObjectID()))
|
||||
exists = len(val) != 0
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// inGraveyard returns true if object was marked as removed.
|
||||
func inGraveyard(tx *bbolt.Tx, addr *objectSDK.Address) bool {
|
||||
graveyard := tx.Bucket(graveyardBucketName)
|
||||
if graveyard == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
tombstone := graveyard.Get(addressKey(addr))
|
||||
|
||||
return len(tombstone) != 0
|
||||
}
|
29
pkg/local_object_storage/metabase/v2/exists_test.go
Normal file
29
pkg/local_object_storage/metabase/v2/exists_test.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_Exists(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
raw := generateRawObject(t)
|
||||
addAttribute(raw, "foo", "bar")
|
||||
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
|
||||
exists, err := db.Exists(obj.Address())
|
||||
require.NoError(t, err)
|
||||
require.False(t, exists)
|
||||
|
||||
err = db.Put(obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
exists, err = db.Exists(obj.Address())
|
||||
require.NoError(t, err)
|
||||
require.True(t, exists)
|
||||
}
|
57
pkg/local_object_storage/metabase/v2/get.go
Normal file
57
pkg/local_object_storage/metabase/v2/get.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Get returns object header for specified address.
|
||||
func (db *DB) Get(addr *objectSDK.Address) (obj *object.Object, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
obj, err = db.get(tx, addr)
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address) (obj *object.Object, err error) {
|
||||
obj = object.New()
|
||||
key := objectKey(addr.ObjectID())
|
||||
cid := addr.ContainerID()
|
||||
|
||||
if inGraveyard(tx, addr) {
|
||||
return nil, ErrAlreadyRemoved
|
||||
}
|
||||
|
||||
// check in primary index
|
||||
data := getFromBucket(tx, primaryBucketName(cid), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cid), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
// if not found then check in storage group index
|
||||
data = getFromBucket(tx, storageGroupBucketName(cid), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return bkt.Get(key)
|
||||
}
|
63
pkg/local_object_storage/metabase/v2/get_test.go
Normal file
63
pkg/local_object_storage/metabase/v2/get_test.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_Get(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
raw := generateRawObject(t)
|
||||
addAttribute(raw, "foo", "bar")
|
||||
|
||||
t.Run("object not found", func(t *testing.T) {
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
|
||||
_, err := db.Get(obj.Address())
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("put regular object", func(t *testing.T) {
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
|
||||
err := db.Put(obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
newObj, err := db.Get(obj.Address())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, newObj)
|
||||
})
|
||||
|
||||
t.Run("put tombstone object", func(t *testing.T) {
|
||||
raw.SetType(objectSDK.TypeTombstone)
|
||||
raw.SetID(testOID())
|
||||
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
|
||||
err := db.Put(obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
newObj, err := db.Get(obj.Address())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, newObj)
|
||||
})
|
||||
|
||||
t.Run("put storage group object", func(t *testing.T) {
|
||||
raw.SetType(objectSDK.TypeStorageGroup)
|
||||
raw.SetID(testOID())
|
||||
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
|
||||
err := db.Put(obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
newObj, err := db.Get(obj.Address())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, newObj)
|
||||
})
|
||||
}
|
12
pkg/local_object_storage/metabase/v2/info.go
Normal file
12
pkg/local_object_storage/metabase/v2/info.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
package meta
|
||||
|
||||
// Info groups the information about DB.
|
||||
type Info struct {
|
||||
// Full path to the metabase.
|
||||
Path string
|
||||
}
|
||||
|
||||
// DumpInfo returns information about the DB.
|
||||
func (db *DB) DumpInfo() Info {
|
||||
return db.info
|
||||
}
|
19
pkg/local_object_storage/metabase/v2/inhume.go
Normal file
19
pkg/local_object_storage/metabase/v2/inhume.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Inhume marks objects as removed but not removes it from metabase.
|
||||
func (db *DB) Inhume(target, tombstone *objectSDK.Address) error {
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
graveyard, err := tx.CreateBucketIfNotExists(graveyardBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// consider checking if target is already in graveyard?
|
||||
return graveyard.Put(addressKey(target), addressKey(tombstone))
|
||||
})
|
||||
}
|
32
pkg/local_object_storage/metabase/v2/inhume_test.go
Normal file
32
pkg/local_object_storage/metabase/v2/inhume_test.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_Inhume(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
raw := generateRawObject(t)
|
||||
addAttribute(raw, "foo", "bar")
|
||||
|
||||
obj := object.NewFromV2(raw.ToV2())
|
||||
tombstoneID := generateAddress()
|
||||
|
||||
err := db.Put(obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Inhume(obj.Address(), tombstoneID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = db.Exists(obj.Address())
|
||||
require.EqualError(t, err, meta.ErrAlreadyRemoved.Error())
|
||||
|
||||
_, err = db.Get(obj.Address())
|
||||
require.EqualError(t, err, meta.ErrAlreadyRemoved.Error())
|
||||
}
|
73
pkg/local_object_storage/metabase/v2/movable.go
Normal file
73
pkg/local_object_storage/metabase/v2/movable.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// ToMoveIt marks objects to move it into another shard. This useful for
|
||||
// faster HRW fetching.
|
||||
func (db *DB) ToMoveIt(addr *objectSDK.Address) error {
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt, err := tx.CreateBucketIfNotExists(toMoveItBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return toMoveIt.Put(addressKey(addr), zeroValue)
|
||||
})
|
||||
}
|
||||
|
||||
// DoNotMove removes `MoveIt` mark from the object.
|
||||
func (db *DB) DoNotMove(addr *objectSDK.Address) error {
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
if toMoveIt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return toMoveIt.Delete(addressKey(addr))
|
||||
})
|
||||
}
|
||||
|
||||
// Movable returns list of marked objects to move into other shard.
|
||||
func (db *DB) Movable() ([]*objectSDK.Address, error) {
|
||||
var strAddrs []string
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
if toMoveIt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return toMoveIt.ForEach(func(k, v []byte) error {
|
||||
strAddrs = append(strAddrs, string(k))
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// we can parse strings to structures in-place, but probably it seems
|
||||
// more efficient to keep bolt db TX code smaller because it might be
|
||||
// bottleneck.
|
||||
addrs := make([]*objectSDK.Address, 0, len(strAddrs))
|
||||
|
||||
for i := range strAddrs {
|
||||
addr := objectSDK.NewAddress()
|
||||
|
||||
err = addr.Parse(strAddrs[i])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse object address %v: %w",
|
||||
strAddrs[i], err)
|
||||
}
|
||||
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
59
pkg/local_object_storage/metabase/v2/movable_test.go
Normal file
59
pkg/local_object_storage/metabase/v2/movable_test.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_Movable(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
raw1 := generateRawObject(t)
|
||||
raw2 := generateRawObject(t)
|
||||
|
||||
obj1 := object.NewFromV2(raw1.ToV2())
|
||||
obj2 := object.NewFromV2(raw2.ToV2())
|
||||
|
||||
// put two objects in metabase
|
||||
err := db.Put(obj1, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Put(obj2, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index empty
|
||||
toMoveList, err := db.Movable()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 0)
|
||||
|
||||
// mark to move object2
|
||||
err = db.ToMoveIt(obj2.Address())
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index contains address of object 2
|
||||
toMoveList, err = db.Movable()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 1)
|
||||
require.Contains(t, toMoveList, obj2.Address())
|
||||
|
||||
// remove from toMoveIt index non existing address
|
||||
err = db.DoNotMove(obj1.Address())
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index hasn't changed
|
||||
toMoveList, err = db.Movable()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 1)
|
||||
|
||||
// remove from toMoveIt index existing address
|
||||
err = db.DoNotMove(obj2.Address())
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index is empty now
|
||||
toMoveList, err = db.Movable()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 0)
|
||||
}
|
268
pkg/local_object_storage/metabase/v2/put.go
Normal file
268
pkg/local_object_storage/metabase/v2/put.go
Normal file
|
@ -0,0 +1,268 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type (
|
||||
namedBucketItem struct {
|
||||
name, key, val []byte
|
||||
}
|
||||
)
|
||||
|
||||
var ErrUnknownObjectType = errors.New("unknown object type")
|
||||
|
||||
// Put saves object header in metabase. Object payload expected to be cut.
|
||||
// Big objects have nil blobovniczaID.
|
||||
func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error {
|
||||
var isParent bool // true when object header obtained from `split.Parent`
|
||||
|
||||
for ; obj != nil; obj, isParent = obj.GetParent(), true {
|
||||
exists, err := db.Exists(obj.Address())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// most right child and split header overlap parent so we have to
|
||||
// check if object exists to not overwrite it twice
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
|
||||
uniqueIndexes, err := uniqueIndexes(obj, isParent, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can' build unique indexes: %w", err)
|
||||
}
|
||||
|
||||
// build list indexes
|
||||
listIndexes, err := listIndexes(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can' build list indexes: %w", err)
|
||||
}
|
||||
|
||||
fkbtIndexes, err := fkbtIndexes(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can' build fake bucket tree indexes: %w", err)
|
||||
}
|
||||
|
||||
// consider making one TX for both target object and parent
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
// put unique indexes
|
||||
for i := range uniqueIndexes {
|
||||
err := putUniqueIndexItem(tx, uniqueIndexes[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// put list indexes
|
||||
for i := range listIndexes {
|
||||
err := putListIndexItem(tx, listIndexes[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// put fake bucket tree indexes
|
||||
for i := range fkbtIndexes {
|
||||
err := putFKBTIndexItem(tx, fkbtIndexes[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil { // if tx failed then return error
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// builds list of <unique> indexes from the object.
|
||||
func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) {
|
||||
addr := obj.Address()
|
||||
objKey := objectKey(addr.ObjectID())
|
||||
result := make([]namedBucketItem, 0, 2)
|
||||
|
||||
// add value to primary unique bucket
|
||||
if !isParent {
|
||||
var bucketName []byte
|
||||
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeRegular:
|
||||
bucketName = primaryBucketName(addr.ContainerID())
|
||||
case objectSDK.TypeTombstone:
|
||||
bucketName = tombstoneBucketName(addr.ContainerID())
|
||||
case objectSDK.TypeStorageGroup:
|
||||
bucketName = storageGroupBucketName(addr.ContainerID())
|
||||
default:
|
||||
return nil, ErrUnknownObjectType
|
||||
}
|
||||
|
||||
rawObject, err := obj.Marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't marshal object header: %w", err)
|
||||
}
|
||||
|
||||
result = append(result, namedBucketItem{
|
||||
name: bucketName,
|
||||
key: objKey,
|
||||
val: rawObject,
|
||||
})
|
||||
|
||||
// index blobovniczaID if it is present
|
||||
if id != nil {
|
||||
result = append(result, namedBucketItem{
|
||||
name: smallBucketName(addr.ContainerID()),
|
||||
key: objKey,
|
||||
val: *id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// index root object
|
||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||
result = append(result, namedBucketItem{
|
||||
name: rootBucketName(addr.ContainerID()),
|
||||
key: objKey,
|
||||
val: zeroValue, // todo: store split.Info when it will be ready
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// builds list of <list> indexes from the object.
|
||||
func listIndexes(obj *object.Object) ([]namedBucketItem, error) {
|
||||
result := make([]namedBucketItem, 0, 1)
|
||||
addr := obj.Address()
|
||||
objKey := objectKey(addr.ObjectID())
|
||||
|
||||
// index payload hashes
|
||||
result = append(result, namedBucketItem{
|
||||
name: payloadHashBucketName(addr.ContainerID()),
|
||||
key: obj.PayloadChecksum().Sum(),
|
||||
val: objKey,
|
||||
})
|
||||
|
||||
if obj.ParentID() != nil {
|
||||
result = append(result, namedBucketItem{
|
||||
name: parentBucketName(addr.ContainerID()),
|
||||
key: objectKey(obj.ParentID()),
|
||||
val: objKey,
|
||||
})
|
||||
}
|
||||
|
||||
// todo: index splitID
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// builds list of <fake bucket tree> indexes from the object.
|
||||
func fkbtIndexes(obj *object.Object) ([]namedBucketItem, error) {
|
||||
addr := obj.Address()
|
||||
objKey := []byte(addr.ObjectID().String())
|
||||
|
||||
attrs := obj.Attributes()
|
||||
result := make([]namedBucketItem, 0, 1+len(attrs))
|
||||
|
||||
// owner
|
||||
result = append(result, namedBucketItem{
|
||||
name: ownerBucketName(addr.ContainerID()),
|
||||
key: []byte(obj.OwnerID().String()),
|
||||
val: objKey,
|
||||
})
|
||||
|
||||
// user specified attributes
|
||||
for i := range attrs {
|
||||
result = append(result, namedBucketItem{
|
||||
name: attributeBucketName(addr.ContainerID(), attrs[i].Key()),
|
||||
key: []byte(attrs[i].Value()),
|
||||
val: objKey,
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
return bkt.Put(item.key, item.val)
|
||||
}
|
||||
|
||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
fkbtRoot, err := bkt.CreateBucketIfNotExists(item.key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
return fkbtRoot.Put(item.val, zeroValue)
|
||||
}
|
||||
|
||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
lst, err := decodeList(bkt.Get(item.key))
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
lst = append(lst, item.val)
|
||||
|
||||
encodedLst, err := encodeList(lst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
return bkt.Put(item.key, encodedLst)
|
||||
}
|
||||
|
||||
// encodeList decodes list of bytes into a single blog for list bucket indexes.
|
||||
func encodeList(lst [][]byte) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
encoder := gob.NewEncoder(buf)
|
||||
|
||||
// consider using protobuf encoding instead of glob
|
||||
if err := encoder.Encode(lst); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// decodeList decodes blob into the list of bytes from list bucket index.
|
||||
func decodeList(data []byte) (lst [][]byte, err error) {
|
||||
if len(data) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
decoder := gob.NewDecoder(bytes.NewReader(data))
|
||||
if err := decoder.Decode(&lst); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return lst, nil
|
||||
}
|
331
pkg/local_object_storage/metabase/v2/select.go
Normal file
331
pkg/local_object_storage/metabase/v2/select.go
Normal file
|
@ -0,0 +1,331 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// filterGroup is a structure that have search filters grouped by access
|
||||
// method. We have fast filters that looks for indexes and do not unmarshal
|
||||
// objects, and slow filters, that applied after fast filters created
|
||||
// smaller set of objects to check.
|
||||
filterGroup struct {
|
||||
cid *container.ID
|
||||
fastFilters object.SearchFilters
|
||||
slowFilters object.SearchFilters
|
||||
}
|
||||
)
|
||||
|
||||
var ErrContainerNotInQuery = errors.New("search query does not contain container id filter")
|
||||
|
||||
// Select returns list of addresses of objects that match search filters.
|
||||
func (db *DB) Select(fs object.SearchFilters) (res []*object.Address, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res, err = db.selectObjects(tx, fs)
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) {
|
||||
group, err := groupFilters(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if group.cid == nil {
|
||||
return nil, ErrContainerNotInQuery
|
||||
}
|
||||
|
||||
// keep matched addresses in this cache
|
||||
// value equal to number (index+1) of latest matched filter
|
||||
mAddr := make(map[string]int)
|
||||
|
||||
expLen := len(group.fastFilters) // expected value of matched filters in mAddr
|
||||
|
||||
if len(group.fastFilters) == 0 {
|
||||
expLen = 1
|
||||
|
||||
db.selectAll(tx, group.cid, mAddr)
|
||||
} else {
|
||||
for i := range group.fastFilters {
|
||||
db.selectFastFilter(tx, group.cid, group.fastFilters[i], mAddr, i)
|
||||
}
|
||||
}
|
||||
|
||||
res := make([]*object.Address, 0, len(mAddr))
|
||||
|
||||
for a, ind := range mAddr {
|
||||
if ind != expLen {
|
||||
continue // ignore objects with unmatched fast filters
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
if err := addr.Parse(a); err != nil {
|
||||
// TODO: storage was broken, so we need to handle it
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if inGraveyard(tx, addr) {
|
||||
continue // ignore removed objects
|
||||
}
|
||||
|
||||
if !db.matchSlowFilters(tx, addr, group.slowFilters) {
|
||||
continue // ignore objects with unmatched slow filters
|
||||
}
|
||||
|
||||
res = append(res, addr)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// selectAll adds to resulting cache all available objects in metabase.
|
||||
func (db *DB) selectAll(tx *bbolt.Tx, cid *container.ID, to map[string]int) {
|
||||
prefix := cid.String() + "/"
|
||||
|
||||
selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, 0)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, 0)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, 0)
|
||||
selectAllFromBucket(tx, parentBucketName(cid), prefix, to, 0)
|
||||
}
|
||||
|
||||
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||
// resulting cache. Keys should be stringed object ids.
|
||||
func selectAllFromBucket(tx *bbolt.Tx, name []byte, prefix string, to map[string]int, fNum int) {
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = bkt.ForEach(func(k, v []byte) error {
|
||||
key := prefix + string(k) // consider using string builders from sync.Pool
|
||||
markAddressInCache(to, fNum, key)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// selectFastFilter makes fast optimized checks for well known buckets or
|
||||
// looking through user attribute buckets otherwise.
|
||||
func (db *DB) selectFastFilter(
|
||||
tx *bbolt.Tx,
|
||||
cid *container.ID, // container we search on
|
||||
f object.SearchFilter, // fast filter
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
prefix := cid.String() + "/"
|
||||
|
||||
// todo: add splitID
|
||||
|
||||
switch f.Header() {
|
||||
case v2object.FilterHeaderObjectID:
|
||||
// todo: implement me
|
||||
case v2object.FilterHeaderOwnerID:
|
||||
bucketName := ownerBucketName(cid)
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
case v2object.FilterHeaderPayloadHash:
|
||||
bucketName := payloadHashBucketName(cid)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
case v2object.FilterHeaderObjectType:
|
||||
var bucketName []byte
|
||||
|
||||
switch f.Value() { // do it better after https://github.com/nspcc-dev/neofs-api/issues/84
|
||||
case "Regular":
|
||||
bucketName = primaryBucketName(cid)
|
||||
|
||||
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
|
||||
|
||||
bucketName = parentBucketName(cid)
|
||||
case "Tombstone":
|
||||
bucketName = tombstoneBucketName(cid)
|
||||
case "StorageGroup":
|
||||
bucketName = storageGroupBucketName(cid)
|
||||
default:
|
||||
db.log.Debug("unknown object type", zap.String("type", f.Value()))
|
||||
}
|
||||
|
||||
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
|
||||
case v2object.FilterHeaderParent:
|
||||
bucketName := parentBucketName(cid)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
case v2object.FilterPropertyRoot:
|
||||
selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum)
|
||||
case v2object.FilterPropertyPhy:
|
||||
selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum)
|
||||
default: // user attribute
|
||||
bucketName := attributeBucketName(cid, f.Header())
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
}
|
||||
}
|
||||
|
||||
// selectFromList looks into <fkbt> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromFKBT(
|
||||
tx *bbolt.Tx,
|
||||
name []byte, // fkbt root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) { //
|
||||
matchFunc, ok := db.matchers[f.Operation()]
|
||||
if !ok {
|
||||
db.log.Debug("missing matcher", zap.Uint32("operation", uint32(f.Operation())))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
fkbtRoot := tx.Bucket(name)
|
||||
if fkbtRoot == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := fkbtRoot.ForEach(func(k, _ []byte) error {
|
||||
if matchFunc(f.Header(), k, f.Value()) {
|
||||
fkbtLeaf := fkbtRoot.Bucket(k)
|
||||
if fkbtLeaf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fkbtLeaf.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
markAddressInCache(to, fNum, addr)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
db.log.Debug("error in FKBT selection", zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// selectFromList looks into <list> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromList(
|
||||
tx *bbolt.Tx,
|
||||
name []byte, // list root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) { //
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch f.Operation() {
|
||||
case object.MatchStringEqual:
|
||||
default:
|
||||
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation())))
|
||||
}
|
||||
|
||||
// warning: it works only for MatchStringEQ, for NotEQ you should iterate over
|
||||
// bkt and apply matchFunc, don't forget to implement this when it will be
|
||||
// needed. Right now it is not efficient to iterate over bucket
|
||||
// when there is only MatchStringEQ.
|
||||
lst, err := decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
for i := range lst {
|
||||
addr := prefix + string(lst[i])
|
||||
markAddressInCache(to, fNum, addr)
|
||||
}
|
||||
}
|
||||
|
||||
// matchSlowFilters return true if object header is matched by all slow filters.
|
||||
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.SearchFilters) bool {
|
||||
if len(f) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, addr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := range f {
|
||||
matchFunc, ok := db.matchers[f[i].Operation()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
var data []byte
|
||||
|
||||
switch f[i].Header() {
|
||||
case v2object.FilterHeaderVersion:
|
||||
data = []byte(obj.Version().String())
|
||||
case v2object.FilterHeaderHomomorphicHash:
|
||||
data = obj.PayloadHomomorphicHash().Sum()
|
||||
case v2object.FilterHeaderCreationEpoch:
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
|
||||
case v2object.FilterHeaderPayloadLength:
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
|
||||
}
|
||||
|
||||
if !matchFunc(f[i].Header(), data, f[i].Value()) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// groupFilters divides filters in two groups: fast and slow. Fast filters
|
||||
// processed by indexes and slow filters processed after by unmarshaling
|
||||
// object headers.
|
||||
func groupFilters(filters object.SearchFilters) (*filterGroup, error) {
|
||||
res := &filterGroup{
|
||||
fastFilters: make(object.SearchFilters, 0, len(filters)),
|
||||
slowFilters: make(object.SearchFilters, 0, len(filters)),
|
||||
}
|
||||
|
||||
for i := range filters {
|
||||
switch filters[i].Header() {
|
||||
case v2object.FilterHeaderContainerID:
|
||||
res.cid = container.NewID()
|
||||
|
||||
err := res.cid.Parse(filters[i].Value())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse container id: %w", err)
|
||||
}
|
||||
case // slow filters
|
||||
v2object.FilterHeaderVersion,
|
||||
v2object.FilterHeaderCreationEpoch,
|
||||
v2object.FilterHeaderPayloadLength,
|
||||
v2object.FilterHeaderHomomorphicHash:
|
||||
res.slowFilters = append(res.slowFilters, filters[i])
|
||||
default: // fast filters or user attributes if unknown
|
||||
res.fastFilters = append(res.fastFilters, filters[i])
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func markAddressInCache(cache map[string]int, fNum int, addr string) {
|
||||
if num := cache[addr]; num == fNum {
|
||||
cache[addr] = num + 1
|
||||
}
|
||||
}
|
302
pkg/local_object_storage/metabase/v2/select_test.go
Normal file
302
pkg/local_object_storage/metabase/v2/select_test.go
Normal file
|
@ -0,0 +1,302 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_SelectUserAttributes(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
cid := testCID()
|
||||
|
||||
raw1 := generateRawObjectWithCID(t, cid)
|
||||
addAttribute(raw1, "foo", "bar")
|
||||
addAttribute(raw1, "x", "y")
|
||||
|
||||
err := db.Put(raw1.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw2 := generateRawObjectWithCID(t, cid)
|
||||
addAttribute(raw2, "foo", "bar")
|
||||
addAttribute(raw2, "x", "z")
|
||||
|
||||
err = db.Put(raw2.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw3 := generateRawObjectWithCID(t, cid)
|
||||
addAttribute(raw3, "a", "b")
|
||||
|
||||
err = db.Put(raw3.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs,
|
||||
raw1.Object().Address(),
|
||||
raw2.Object().Address(),
|
||||
)
|
||||
|
||||
fs = generateSearchFilter(cid)
|
||||
fs.AddFilter("x", "y", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs, raw1.Object().Address())
|
||||
|
||||
fs = generateSearchFilter(cid)
|
||||
fs.AddFilter("a", "b", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs, raw3.Object().Address())
|
||||
|
||||
fs = generateSearchFilter(cid)
|
||||
fs.AddFilter("c", "d", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs)
|
||||
|
||||
fs = generateSearchFilter(cid)
|
||||
testSelect(t, db, fs,
|
||||
raw1.Object().Address(),
|
||||
raw2.Object().Address(),
|
||||
raw3.Object().Address(),
|
||||
)
|
||||
}
|
||||
|
||||
func TestDB_SelectRootPhyParent(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
cid := testCID()
|
||||
|
||||
// prepare
|
||||
|
||||
small := generateRawObjectWithCID(t, cid)
|
||||
err := db.Put(small.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
ts := generateRawObjectWithCID(t, cid)
|
||||
ts.SetType(objectSDK.TypeTombstone)
|
||||
err = db.Put(ts.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
sg := generateRawObjectWithCID(t, cid)
|
||||
sg.SetType(objectSDK.TypeStorageGroup)
|
||||
err = db.Put(sg.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
leftChild := generateRawObjectWithCID(t, cid)
|
||||
leftChild.InitRelations()
|
||||
err = db.Put(leftChild.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
parent := generateRawObjectWithCID(t, cid)
|
||||
|
||||
rightChild := generateRawObjectWithCID(t, cid)
|
||||
rightChild.SetParent(parent.Object().SDK())
|
||||
rightChild.SetParentID(parent.Object().Address().ObjectID())
|
||||
err = db.Put(rightChild.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
link := generateRawObjectWithCID(t, cid)
|
||||
link.SetParent(parent.Object().SDK())
|
||||
link.SetParentID(parent.Object().Address().ObjectID())
|
||||
link.SetChildren(leftChild.Object().Address().ObjectID(), rightChild.Object().Address().ObjectID())
|
||||
|
||||
err = db.Put(link.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// printDB(meta.ExtractDB(db))
|
||||
// return
|
||||
|
||||
t.Run("root objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddRootFilter()
|
||||
testSelect(t, db, fs,
|
||||
small.Object().Address(),
|
||||
parent.Object().Address(),
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("phy objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddPhyFilter()
|
||||
testSelect(t, db, fs,
|
||||
small.Object().Address(),
|
||||
ts.Object().Address(),
|
||||
sg.Object().Address(),
|
||||
leftChild.Object().Address(),
|
||||
rightChild.Object().Address(),
|
||||
link.Object().Address(),
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("regular objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderObjectType, "Regular", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs,
|
||||
small.Object().Address(),
|
||||
leftChild.Object().Address(),
|
||||
rightChild.Object().Address(),
|
||||
link.Object().Address(),
|
||||
parent.Object().Address(),
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("tombstone objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderObjectType, "Tombstone", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs, ts.Object().Address())
|
||||
})
|
||||
|
||||
t.Run("storage group objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderObjectType, "StorageGroup", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs, sg.Object().Address())
|
||||
})
|
||||
|
||||
t.Run("objects with parent", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderParent,
|
||||
parent.Object().Address().ObjectID().String(),
|
||||
objectSDK.MatchStringEqual)
|
||||
|
||||
testSelect(t, db, fs,
|
||||
rightChild.Object().Address(),
|
||||
link.Object().Address(),
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("all objects", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
testSelect(t, db, fs,
|
||||
small.Object().Address(),
|
||||
ts.Object().Address(),
|
||||
sg.Object().Address(),
|
||||
leftChild.Object().Address(),
|
||||
rightChild.Object().Address(),
|
||||
link.Object().Address(),
|
||||
parent.Object().Address(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDB_SelectInhume(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
cid := testCID()
|
||||
|
||||
raw1 := generateRawObjectWithCID(t, cid)
|
||||
err := db.Put(raw1.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw2 := generateRawObjectWithCID(t, cid)
|
||||
err = db.Put(raw2.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
fs := generateSearchFilter(cid)
|
||||
testSelect(t, db, fs,
|
||||
raw1.Object().Address(),
|
||||
raw2.Object().Address(),
|
||||
)
|
||||
|
||||
tombstone := objectSDK.NewAddress()
|
||||
tombstone.SetContainerID(cid)
|
||||
tombstone.SetObjectID(testOID())
|
||||
|
||||
err = db.Inhume(raw2.Object().Address(), tombstone)
|
||||
require.NoError(t, err)
|
||||
|
||||
fs = generateSearchFilter(cid)
|
||||
testSelect(t, db, fs,
|
||||
raw1.Object().Address(),
|
||||
)
|
||||
}
|
||||
|
||||
func TestDB_SelectPayloadHash(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
cid := testCID()
|
||||
|
||||
raw1 := generateRawObjectWithCID(t, cid)
|
||||
err := db.Put(raw1.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw2 := generateRawObjectWithCID(t, cid)
|
||||
err = db.Put(raw2.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderPayloadHash,
|
||||
hex.EncodeToString(raw1.PayloadChecksum().Sum()),
|
||||
objectSDK.MatchStringEqual)
|
||||
|
||||
testSelect(t, db, fs, raw1.Object().Address())
|
||||
}
|
||||
|
||||
func TestDB_SelectWithSlowFilters(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
cid := testCID()
|
||||
|
||||
v20 := new(pkg.Version)
|
||||
v20.SetMajor(2)
|
||||
|
||||
v21 := new(pkg.Version)
|
||||
v21.SetMajor(2)
|
||||
v21.SetMinor(1)
|
||||
|
||||
raw1 := generateRawObjectWithCID(t, cid)
|
||||
raw1.SetPayloadSize(10)
|
||||
raw1.SetCreationEpoch(11)
|
||||
raw1.SetVersion(v20)
|
||||
err := db.Put(raw1.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw2 := generateRawObjectWithCID(t, cid)
|
||||
raw2.SetPayloadSize(20)
|
||||
raw2.SetCreationEpoch(21)
|
||||
raw2.SetVersion(v21)
|
||||
err = db.Put(raw2.Object(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("object with TZHash", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderHomomorphicHash,
|
||||
hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()),
|
||||
objectSDK.MatchStringEqual)
|
||||
|
||||
testSelect(t, db, fs, raw1.Object().Address())
|
||||
})
|
||||
|
||||
t.Run("object with payload length", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual)
|
||||
|
||||
testSelect(t, db, fs, raw2.Object().Address())
|
||||
})
|
||||
|
||||
t.Run("object with creation epoch", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual)
|
||||
|
||||
testSelect(t, db, fs, raw1.Object().Address())
|
||||
})
|
||||
|
||||
t.Run("object with version", func(t *testing.T) {
|
||||
fs := generateSearchFilter(cid)
|
||||
fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21)
|
||||
testSelect(t, db, fs, raw2.Object().Address())
|
||||
})
|
||||
}
|
||||
|
||||
func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters {
|
||||
fs := objectSDK.SearchFilters{}
|
||||
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid)
|
||||
|
||||
return fs
|
||||
}
|
28
pkg/local_object_storage/metabase/v2/small.go
Normal file
28
pkg/local_object_storage/metabase/v2/small.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// IsSmall returns blobovniczaID for small objects and nil for big objects.
|
||||
// Small objects stored in blobovnicza instances. Big objects stored in FS by
|
||||
// shallow path which is calculated from address and therefore it is not
|
||||
// indexed in metabase.
|
||||
func (db *DB) IsSmall(addr *objectSDK.Address) (id *blobovnicza.ID, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
// if graveyard is empty, then check if object exists in primary bucket
|
||||
smallBucket := tx.Bucket(smallBucketName(addr.ContainerID()))
|
||||
if smallBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
blobovniczaID := smallBucket.Get(objectKey(addr.ObjectID()))
|
||||
id = blobovnicza.NewIDFromBytes(blobovniczaID)
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
return id, err
|
||||
}
|
45
pkg/local_object_storage/metabase/v2/small_test.go
Normal file
45
pkg/local_object_storage/metabase/v2/small_test.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_IsSmall(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
raw1 := generateRawObject(t)
|
||||
raw2 := generateRawObject(t)
|
||||
|
||||
blobovniczaID := blobovnicza.ID{1, 2, 3, 4}
|
||||
|
||||
obj1 := object.NewFromV2(raw1.ToV2())
|
||||
obj2 := object.NewFromV2(raw2.ToV2())
|
||||
|
||||
// check IsSmall from empty database
|
||||
fetchedBlobovniczaID, err := db.IsSmall(obj1.Address())
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, fetchedBlobovniczaID)
|
||||
|
||||
// put one object with blobovniczaID
|
||||
err = db.Put(obj1, &blobovniczaID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// put one object without blobovniczaID
|
||||
err = db.Put(obj2, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check IsSmall for object without blobovniczaID
|
||||
fetchedBlobovniczaID, err = db.IsSmall(obj2.Address())
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, fetchedBlobovniczaID)
|
||||
|
||||
// check IsSmall for object with blobovniczaID
|
||||
fetchedBlobovniczaID, err = db.IsSmall(obj1.Address())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, blobovniczaID, *fetchedBlobovniczaID)
|
||||
}
|
92
pkg/local_object_storage/metabase/v2/util.go
Normal file
92
pkg/local_object_storage/metabase/v2/util.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
)
|
||||
|
||||
/*
|
||||
We might increase performance by not using string representation of
|
||||
identities and addresses. String representation require base58 encoding that
|
||||
slows execution. Instead we can try to marshal these structures directly into
|
||||
bytes. Check it later.
|
||||
*/
|
||||
|
||||
var (
|
||||
graveyardBucketName = []byte("Graveyard")
|
||||
toMoveItBucketName = []byte("ToMoveIt")
|
||||
|
||||
zeroValue = []byte{0xFF}
|
||||
|
||||
smallPostfix = "_small"
|
||||
storageGroupPostfix = "_SG"
|
||||
tombstonePostfix = "_TS"
|
||||
ownerPostfix = "_ownerid"
|
||||
payloadHashPostfix = "_payloadhash"
|
||||
rootPostfix = "_root"
|
||||
parentPostfix = "_parent"
|
||||
|
||||
userAttributePostfix = "_attr_"
|
||||
)
|
||||
|
||||
// primaryBucketName returns <CID>.
|
||||
func primaryBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String())
|
||||
}
|
||||
|
||||
// tombstoneBucketName returns <CID>_TS.
|
||||
func tombstoneBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + tombstonePostfix)
|
||||
}
|
||||
|
||||
// storageGroupBucketName returns <CID>_SG.
|
||||
func storageGroupBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + storageGroupPostfix)
|
||||
}
|
||||
|
||||
// smallBucketName returns <CID>_small.
|
||||
func smallBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + smallPostfix) // consider caching output values
|
||||
}
|
||||
|
||||
// attributeBucketName returns <CID>_attr_<attributeKey>.
|
||||
func attributeBucketName(cid *container.ID, attributeKey string) []byte {
|
||||
sb := strings.Builder{} // consider getting string builders from sync.Pool
|
||||
sb.WriteString(cid.String())
|
||||
sb.WriteString(userAttributePostfix)
|
||||
sb.WriteString(attributeKey)
|
||||
|
||||
return []byte(sb.String())
|
||||
}
|
||||
|
||||
// payloadHashBucketName returns <CID>_payloadhash.
|
||||
func payloadHashBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + payloadHashPostfix)
|
||||
}
|
||||
|
||||
// rootBucketName returns <CID>_root.
|
||||
func rootBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + rootPostfix)
|
||||
}
|
||||
|
||||
// ownerBucketName returns <CID>_ownerid.
|
||||
func ownerBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + ownerPostfix)
|
||||
}
|
||||
|
||||
// parentBucketNAme returns <CID>_parent.
|
||||
func parentBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + parentPostfix)
|
||||
}
|
||||
|
||||
// addressKey returns key for K-V tables when key is a whole address.
|
||||
func addressKey(addr *object.Address) []byte {
|
||||
return []byte(addr.String())
|
||||
}
|
||||
|
||||
// objectKey returns key for K-V tables when key is an object id.
|
||||
func objectKey(oid *object.ID) []byte {
|
||||
return []byte(oid.String())
|
||||
}
|
Loading…
Reference in a new issue