forked from TrueCloudLab/frostfs-node
[#128] localstorage: Implement primary object metabase
Implement bolt-based metabase that is going to be used in local object storage. Implement Put/Get/Select methods. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d08c1c76c1
commit
85aacbbb10
7 changed files with 385 additions and 2 deletions
6
go.mod
6
go.mod
|
@ -13,7 +13,7 @@ require (
|
|||
github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2
|
||||
github.com/multiformats/go-multihash v0.0.13 // indirect
|
||||
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78
|
||||
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20201020152448-c8f46f7d9762
|
||||
github.com/nspcc-dev/neofs-api-go v1.3.1-0.20201028111149-ac38d13f04ff
|
||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||
github.com/nspcc-dev/tzhash v1.4.0
|
||||
github.com/panjf2000/ants/v2 v2.3.0
|
||||
|
@ -25,12 +25,14 @@ require (
|
|||
github.com/spf13/viper v1.7.0
|
||||
github.com/stretchr/testify v1.6.1
|
||||
github.com/valyala/fasthttp v1.9.0
|
||||
go.etcd.io/bbolt v1.3.4
|
||||
go.etcd.io/bbolt v1.3.5
|
||||
go.uber.org/atomic v1.5.1
|
||||
go.uber.org/multierr v1.4.0 // indirect
|
||||
go.uber.org/zap v1.13.0
|
||||
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect
|
||||
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
|
||||
golang.org/x/net v0.0.0-20191105084925-a882066a44e0 // indirect
|
||||
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 // indirect
|
||||
golang.org/x/tools v0.0.0-20200123022218-593de606220b // indirect
|
||||
google.golang.org/grpc v1.29.1
|
||||
google.golang.org/protobuf v1.23.0
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
25
pkg/local_object_storage/metabase/db.go
Normal file
25
pkg/local_object_storage/metabase/db.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// DB represents local metabase of storage node.
|
||||
type DB struct {
|
||||
boltDB *bbolt.DB
|
||||
|
||||
matchers map[object.SearchMatchType]func(string, string) bool
|
||||
}
|
||||
|
||||
// NewDB creates, initializes and returns DB instance.
|
||||
func NewDB(boltDB *bbolt.DB) *DB {
|
||||
return &DB{
|
||||
boltDB: boltDB,
|
||||
matchers: map[object.SearchMatchType]func(string, string) bool{
|
||||
object.MatchStringEqual: func(s string, s2 string) bool {
|
||||
return s == s2
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
102
pkg/local_object_storage/metabase/db_test.go
Normal file
102
pkg/local_object_storage/metabase/db_test.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func testSelect(t *testing.T, db *DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) {
|
||||
res, err := db.Select(fs)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(exp))
|
||||
|
||||
for i := range exp {
|
||||
require.Contains(t, res, exp[i])
|
||||
}
|
||||
}
|
||||
|
||||
func TestDB(t *testing.T) {
|
||||
version := pkg.NewVersion()
|
||||
version.SetMajor(2)
|
||||
version.SetMinor(1)
|
||||
|
||||
cs := [sha256.Size]byte{}
|
||||
rand.Read(cs[:])
|
||||
|
||||
cid := container.NewID()
|
||||
cid.SetSHA256(cs)
|
||||
|
||||
w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
ownerID := owner.NewID()
|
||||
ownerID.SetNeo3Wallet(w)
|
||||
|
||||
rand.Read(cs[:])
|
||||
oid := objectSDK.NewID()
|
||||
oid.SetSHA256(cs)
|
||||
|
||||
obj := object.NewRaw()
|
||||
obj.SetID(oid)
|
||||
obj.SetOwnerID(ownerID)
|
||||
obj.SetContainerID(cid)
|
||||
obj.SetVersion(version)
|
||||
|
||||
k, v := "key", "value"
|
||||
|
||||
a := objectSDK.NewAttribute()
|
||||
a.SetKey(k)
|
||||
a.SetValue(v)
|
||||
|
||||
obj.SetAttributes(a)
|
||||
|
||||
path := "test.db"
|
||||
|
||||
bdb, err := bbolt.Open(path, 0600, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
bdb.Close()
|
||||
os.Remove(path)
|
||||
}()
|
||||
|
||||
db := NewDB(bdb)
|
||||
|
||||
o := obj.Object()
|
||||
|
||||
require.NoError(t, db.Put(o))
|
||||
|
||||
o2, err := db.Get(o.Address())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, o, o2)
|
||||
|
||||
fs := objectSDK.SearchFilters{}
|
||||
|
||||
// filter container ID
|
||||
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid)
|
||||
testSelect(t, db, fs, o.Address())
|
||||
|
||||
// filter owner ID
|
||||
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, ownerID)
|
||||
testSelect(t, db, fs, o.Address())
|
||||
|
||||
// filter attribute
|
||||
fs.AddFilter(k, v, objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs, o.Address())
|
||||
|
||||
// filter mismatch
|
||||
fs.AddFilter(k, v+"1", objectSDK.MatchStringEqual)
|
||||
testSelect(t, db, fs)
|
||||
}
|
38
pkg/local_object_storage/metabase/get.go
Normal file
38
pkg/local_object_storage/metabase/get.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var errNotFound = errors.New("object not found")
|
||||
|
||||
// Get returns object header for specified address.
|
||||
func (db *DB) Get(addr *objectSDK.Address) (*object.Object, error) {
|
||||
var obj *object.Object
|
||||
|
||||
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
primaryBucket := tx.Bucket(primaryBucket)
|
||||
if primaryBucket == nil {
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
data := primaryBucket.Get(addressKey(addr))
|
||||
if data == nil {
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
obj, err = object.FromBytes(data)
|
||||
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
131
pkg/local_object_storage/metabase/put.go
Normal file
131
pkg/local_object_storage/metabase/put.go
Normal file
|
@ -0,0 +1,131 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type bucketItem struct {
|
||||
key, val string
|
||||
}
|
||||
|
||||
var (
|
||||
primaryBucket = []byte("objects")
|
||||
indexBucket = []byte("index")
|
||||
)
|
||||
|
||||
// Put saves object in DB.
|
||||
//
|
||||
// Object payload expected to be cut.
|
||||
func (db *DB) Put(obj *object.Object) error {
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
// create primary bucket (addr: header)
|
||||
primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not create primary bucket", db)
|
||||
}
|
||||
|
||||
data, err := obj.ToV2().StableMarshal(nil)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not marshal the object", db)
|
||||
}
|
||||
|
||||
addrKey := addressKey(obj.Address())
|
||||
|
||||
// put header to primary bucket
|
||||
if err := primaryBucket.Put(addrKey, data); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not put item to primary bucket", db)
|
||||
}
|
||||
|
||||
// create bucket for indices
|
||||
indexBucket, err := tx.CreateBucketIfNotExists(indexBucket)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not create index bucket", db)
|
||||
}
|
||||
|
||||
// calculate indexed values for object
|
||||
indices := objectIndices(obj)
|
||||
|
||||
for i := range indices {
|
||||
// create index bucket
|
||||
keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not create bucket for header key", db)
|
||||
}
|
||||
|
||||
// FIXME: here we can get empty slice that could not be the key
|
||||
// Possible solutions:
|
||||
// 1. add prefix byte (0 if empty);
|
||||
v := []byte(indices[i].val)
|
||||
|
||||
// put value to key bucket (it is needed for iteration over all values (Select))
|
||||
if err := keyBucket.Put(keyWithPrefix(v, false), nil); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not put header value", db)
|
||||
}
|
||||
|
||||
// create address bucket for the value
|
||||
valBucket, err := keyBucket.CreateBucketIfNotExists(keyWithPrefix(v, true))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not create bucket for header value", db)
|
||||
}
|
||||
|
||||
// put object address to value bucket
|
||||
if err := valBucket.Put(addrKey, nil); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not put item to header bucket", db)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func keyWithPrefix(key []byte, bucket bool) []byte {
|
||||
b := byte(0)
|
||||
if bucket {
|
||||
b = 1
|
||||
}
|
||||
|
||||
return append([]byte{b}, key...)
|
||||
}
|
||||
|
||||
func keyWithoutPrefix(key []byte) ([]byte, bool) {
|
||||
return key[1:], key[0] == 1
|
||||
}
|
||||
|
||||
func addressKey(addr *objectSDK.Address) []byte {
|
||||
return []byte(addr.String())
|
||||
}
|
||||
|
||||
func objectIndices(obj *object.Object) []bucketItem {
|
||||
as := obj.GetAttributes()
|
||||
|
||||
res := make([]bucketItem, 0, 3+len(as))
|
||||
|
||||
res = append(res,
|
||||
bucketItem{
|
||||
key: v2object.FilterHeaderVersion,
|
||||
val: obj.GetVersion().String(),
|
||||
},
|
||||
bucketItem{
|
||||
key: v2object.FilterHeaderContainerID,
|
||||
val: obj.GetContainerID().String(),
|
||||
},
|
||||
bucketItem{
|
||||
key: v2object.FilterHeaderOwnerID,
|
||||
val: obj.GetOwnerID().String(),
|
||||
},
|
||||
// TODO: add remaining fields after neofs-api#72
|
||||
)
|
||||
|
||||
for _, a := range as {
|
||||
res = append(res, bucketItem{
|
||||
key: a.GetKey(),
|
||||
val: a.GetValue(),
|
||||
})
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
85
pkg/local_object_storage/metabase/select.go
Normal file
85
pkg/local_object_storage/metabase/select.go
Normal file
|
@ -0,0 +1,85 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Select returns list of addresses of objects that match search filters.
|
||||
func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
|
||||
res := make([]*object.Address, 0)
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
// get indexed bucket
|
||||
indexBucket := tx.Bucket(indexBucket)
|
||||
if indexBucket == nil {
|
||||
// empty storage
|
||||
return nil
|
||||
}
|
||||
|
||||
// keep addresses that does not match some filter
|
||||
mAddr := make(map[string]struct{})
|
||||
|
||||
for _, f := range fs {
|
||||
matchFunc, ok := db.matchers[f.Operation()]
|
||||
if !ok {
|
||||
return errors.Errorf("no function for matcher %v", f.Operation())
|
||||
}
|
||||
|
||||
key := f.Header()
|
||||
|
||||
// get bucket with values
|
||||
keyBucket := indexBucket.Bucket([]byte(key))
|
||||
if keyBucket == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fVal := f.Value()
|
||||
|
||||
// iterate over all existing values for the key
|
||||
if err := keyBucket.ForEach(func(k, _ []byte) error {
|
||||
if k, bucket := keyWithoutPrefix(k); !bucket {
|
||||
if !matchFunc(string(k), fVal) {
|
||||
// exclude all addresses with this value
|
||||
return keyBucket.Bucket(keyWithPrefix(k, true)).ForEach(func(k, _ []byte) error {
|
||||
mAddr[string(k)] = struct{}{}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key)
|
||||
}
|
||||
}
|
||||
|
||||
// get primary bucket
|
||||
primaryBucket := tx.Bucket(primaryBucket)
|
||||
if primaryBucket == nil {
|
||||
// empty storage
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate over all stored addresses
|
||||
return primaryBucket.ForEach(func(k, v []byte) error {
|
||||
if _, ok := mAddr[string(k)]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
if err := addr.Parse(string(k)); err != nil {
|
||||
// TODO: storage was broken, so we need to handle it
|
||||
return err
|
||||
}
|
||||
|
||||
res = append(res, addr)
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
return res, err
|
||||
}
|
Loading…
Reference in a new issue