From 85aacbbb1000f293f2123afc3e845b0f14a410a7 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 28 Oct 2020 17:49:30 +0300 Subject: [PATCH] [#128] localstorage: Implement primary object metabase Implement bolt-based metabase that is going to be used in local object storage. Implement Put/Get/Select methods. Signed-off-by: Leonard Lyubich --- go.mod | 6 +- go.sum | Bin 58620 -> 59264 bytes pkg/local_object_storage/metabase/db.go | 25 ++++ pkg/local_object_storage/metabase/db_test.go | 102 +++++++++++++++ pkg/local_object_storage/metabase/get.go | 38 ++++++ pkg/local_object_storage/metabase/put.go | 131 +++++++++++++++++++ pkg/local_object_storage/metabase/select.go | 85 ++++++++++++ 7 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 pkg/local_object_storage/metabase/db.go create mode 100644 pkg/local_object_storage/metabase/db_test.go create mode 100644 pkg/local_object_storage/metabase/get.go create mode 100644 pkg/local_object_storage/metabase/put.go create mode 100644 pkg/local_object_storage/metabase/select.go diff --git a/go.mod b/go.mod index 40025443..e7a5484b 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2 github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 - github.com/nspcc-dev/neofs-api-go v1.3.1-0.20201020152448-c8f46f7d9762 + github.com/nspcc-dev/neofs-api-go v1.3.1-0.20201028111149-ac38d13f04ff github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 @@ -25,12 +25,14 @@ require ( github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.6.1 github.com/valyala/fasthttp v1.9.0 - go.etcd.io/bbolt v1.3.4 + go.etcd.io/bbolt v1.3.5 go.uber.org/atomic v1.5.1 go.uber.org/multierr v1.4.0 // indirect go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect + golang.org/x/net v0.0.0-20191105084925-a882066a44e0 // indirect + golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 // indirect golang.org/x/tools v0.0.0-20200123022218-593de606220b // indirect google.golang.org/grpc v1.29.1 google.golang.org/protobuf v1.23.0 diff --git a/go.sum b/go.sum index 0e7779dcb227e61303d7c5ba86faf0541fed0a2a..ac16c78ea52e41b6ed96b3f8f8134d8f2dcad1e2 100644 GIT binary patch delta 465 zcmZ|KyH49s0Dxg3HY5ZUNFazJCYC75ImgG2-31)m#DsthahhNj?D!TNa1zHj_CWg# zoJXkCiA#_XTMH{<=+p=3-bd)rh8g|`_@w{u^YoXW)8pG*Y0ZTnV=eB=azZiyu!x3(P9sqB=pJMeQxO;jBI|dlFyGb*~<^O{tprAmlOa1 delta 135 zcmZoT&-~{k^M>^@oBzoedpjE#ni`pySm-8Oq?wqdnWtEqn;9u&7+RSYIHwmoIa@>o zdPG=+YDYQw7Dt$CTbQSno96}l1~`>_6s4AzdHY1a;gs Xobo|x^Mno0_$T`u;@a$g&`=Ek2x%{6 diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go new file mode 100644 index 00000000..732fc5f3 --- /dev/null +++ b/pkg/local_object_storage/metabase/db.go @@ -0,0 +1,25 @@ +package meta + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" +) + +// DB represents local metabase of storage node. +type DB struct { + boltDB *bbolt.DB + + matchers map[object.SearchMatchType]func(string, string) bool +} + +// NewDB creates, initializes and returns DB instance. +func NewDB(boltDB *bbolt.DB) *DB { + return &DB{ + boltDB: boltDB, + matchers: map[object.SearchMatchType]func(string, string) bool{ + object.MatchStringEqual: func(s string, s2 string) bool { + return s == s2 + }, + }, + } +} diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go new file mode 100644 index 00000000..eb318663 --- /dev/null +++ b/pkg/local_object_storage/metabase/db_test.go @@ -0,0 +1,102 @@ +package meta + +import ( + "crypto/rand" + "crypto/sha256" + "os" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/util/test" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func testSelect(t *testing.T, db *DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) { + res, err := db.Select(fs) + require.NoError(t, err) + require.Len(t, res, len(exp)) + + for i := range exp { + require.Contains(t, res, exp[i]) + } +} + +func TestDB(t *testing.T) { + version := pkg.NewVersion() + version.SetMajor(2) + version.SetMinor(1) + + cs := [sha256.Size]byte{} + rand.Read(cs[:]) + + cid := container.NewID() + cid.SetSHA256(cs) + + w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey) + require.NoError(t, err) + + ownerID := owner.NewID() + ownerID.SetNeo3Wallet(w) + + rand.Read(cs[:]) + oid := objectSDK.NewID() + oid.SetSHA256(cs) + + obj := object.NewRaw() + obj.SetID(oid) + obj.SetOwnerID(ownerID) + obj.SetContainerID(cid) + obj.SetVersion(version) + + k, v := "key", "value" + + a := objectSDK.NewAttribute() + a.SetKey(k) + a.SetValue(v) + + obj.SetAttributes(a) + + path := "test.db" + + bdb, err := bbolt.Open(path, 0600, nil) + require.NoError(t, err) + + defer func() { + bdb.Close() + os.Remove(path) + }() + + db := NewDB(bdb) + + o := obj.Object() + + require.NoError(t, db.Put(o)) + + o2, err := db.Get(o.Address()) + require.NoError(t, err) + + require.Equal(t, o, o2) + + fs := objectSDK.SearchFilters{} + + // filter container ID + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid) + testSelect(t, db, fs, o.Address()) + + // filter owner ID + fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, ownerID) + testSelect(t, db, fs, o.Address()) + + // filter attribute + fs.AddFilter(k, v, objectSDK.MatchStringEqual) + testSelect(t, db, fs, o.Address()) + + // filter mismatch + fs.AddFilter(k, v+"1", objectSDK.MatchStringEqual) + testSelect(t, db, fs) +} diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go new file mode 100644 index 00000000..445c553b --- /dev/null +++ b/pkg/local_object_storage/metabase/get.go @@ -0,0 +1,38 @@ +package meta + +import ( + "errors" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "go.etcd.io/bbolt" +) + +var errNotFound = errors.New("object not found") + +// Get returns object header for specified address. +func (db *DB) Get(addr *objectSDK.Address) (*object.Object, error) { + var obj *object.Object + + if err := db.boltDB.View(func(tx *bbolt.Tx) error { + primaryBucket := tx.Bucket(primaryBucket) + if primaryBucket == nil { + return errNotFound + } + + data := primaryBucket.Get(addressKey(addr)) + if data == nil { + return errNotFound + } + + var err error + + obj, err = object.FromBytes(data) + + return err + }); err != nil { + return nil, err + } + + return obj, nil +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go new file mode 100644 index 00000000..d067b936 --- /dev/null +++ b/pkg/local_object_storage/metabase/put.go @@ -0,0 +1,131 @@ +package meta + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +type bucketItem struct { + key, val string +} + +var ( + primaryBucket = []byte("objects") + indexBucket = []byte("index") +) + +// Put saves object in DB. +// +// Object payload expected to be cut. +func (db *DB) Put(obj *object.Object) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + // create primary bucket (addr: header) + primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket) + if err != nil { + return errors.Wrapf(err, "(%T) could not create primary bucket", db) + } + + data, err := obj.ToV2().StableMarshal(nil) + if err != nil { + return errors.Wrapf(err, "(%T) could not marshal the object", db) + } + + addrKey := addressKey(obj.Address()) + + // put header to primary bucket + if err := primaryBucket.Put(addrKey, data); err != nil { + return errors.Wrapf(err, "(%T) could not put item to primary bucket", db) + } + + // create bucket for indices + indexBucket, err := tx.CreateBucketIfNotExists(indexBucket) + if err != nil { + return errors.Wrapf(err, "(%T) could not create index bucket", db) + } + + // calculate indexed values for object + indices := objectIndices(obj) + + for i := range indices { + // create index bucket + keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key)) + if err != nil { + return errors.Wrapf(err, "(%T) could not create bucket for header key", db) + } + + // FIXME: here we can get empty slice that could not be the key + // Possible solutions: + // 1. add prefix byte (0 if empty); + v := []byte(indices[i].val) + + // put value to key bucket (it is needed for iteration over all values (Select)) + if err := keyBucket.Put(keyWithPrefix(v, false), nil); err != nil { + return errors.Wrapf(err, "(%T) could not put header value", db) + } + + // create address bucket for the value + valBucket, err := keyBucket.CreateBucketIfNotExists(keyWithPrefix(v, true)) + if err != nil { + return errors.Wrapf(err, "(%T) could not create bucket for header value", db) + } + + // put object address to value bucket + if err := valBucket.Put(addrKey, nil); err != nil { + return errors.Wrapf(err, "(%T) could not put item to header bucket", db) + } + } + + return nil + }) +} + +func keyWithPrefix(key []byte, bucket bool) []byte { + b := byte(0) + if bucket { + b = 1 + } + + return append([]byte{b}, key...) +} + +func keyWithoutPrefix(key []byte) ([]byte, bool) { + return key[1:], key[0] == 1 +} + +func addressKey(addr *objectSDK.Address) []byte { + return []byte(addr.String()) +} + +func objectIndices(obj *object.Object) []bucketItem { + as := obj.GetAttributes() + + res := make([]bucketItem, 0, 3+len(as)) + + res = append(res, + bucketItem{ + key: v2object.FilterHeaderVersion, + val: obj.GetVersion().String(), + }, + bucketItem{ + key: v2object.FilterHeaderContainerID, + val: obj.GetContainerID().String(), + }, + bucketItem{ + key: v2object.FilterHeaderOwnerID, + val: obj.GetOwnerID().String(), + }, + // TODO: add remaining fields after neofs-api#72 + ) + + for _, a := range as { + res = append(res, bucketItem{ + key: a.GetKey(), + val: a.GetValue(), + }) + } + + return res +} diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go new file mode 100644 index 00000000..ac8cf116 --- /dev/null +++ b/pkg/local_object_storage/metabase/select.go @@ -0,0 +1,85 @@ +package meta + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +// Select returns list of addresses of objects that match search filters. +func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { + res := make([]*object.Address, 0) + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + // get indexed bucket + indexBucket := tx.Bucket(indexBucket) + if indexBucket == nil { + // empty storage + return nil + } + + // keep addresses that does not match some filter + mAddr := make(map[string]struct{}) + + for _, f := range fs { + matchFunc, ok := db.matchers[f.Operation()] + if !ok { + return errors.Errorf("no function for matcher %v", f.Operation()) + } + + key := f.Header() + + // get bucket with values + keyBucket := indexBucket.Bucket([]byte(key)) + if keyBucket == nil { + continue + } + + fVal := f.Value() + + // iterate over all existing values for the key + if err := keyBucket.ForEach(func(k, _ []byte) error { + if k, bucket := keyWithoutPrefix(k); !bucket { + if !matchFunc(string(k), fVal) { + // exclude all addresses with this value + return keyBucket.Bucket(keyWithPrefix(k, true)).ForEach(func(k, _ []byte) error { + mAddr[string(k)] = struct{}{} + + return nil + }) + } + } + + return nil + }); err != nil { + return errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key) + } + } + + // get primary bucket + primaryBucket := tx.Bucket(primaryBucket) + if primaryBucket == nil { + // empty storage + return nil + } + + // iterate over all stored addresses + return primaryBucket.ForEach(func(k, v []byte) error { + if _, ok := mAddr[string(k)]; ok { + return nil + } + + addr := object.NewAddress() + if err := addr.Parse(string(k)); err != nil { + // TODO: storage was broken, so we need to handle it + return err + } + + res = append(res, addr) + + return nil + }) + }) + + return res, err +}