From 20ed7c0d6183c39780e9bb9700c0a3cad4205bdc Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 29 Oct 2020 15:03:55 +0300 Subject: [PATCH] [#128] metabase: Implement Delete method Implement Delete method on DB structure that adds deleted addresses to tombstone index. Do not attach addresses from tombstone index to Select result. Return error from Get method if address is presented in tombstone index. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/db_test.go | 69 +++++++++++++++++--- pkg/local_object_storage/metabase/delete.go | 31 +++++++++ pkg/local_object_storage/metabase/get.go | 9 ++- pkg/local_object_storage/metabase/select.go | 5 ++ 4 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 pkg/local_object_storage/metabase/delete.go diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index eb3186630..8939670c0 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -26,16 +26,32 @@ func testSelect(t *testing.T, db *DB, fs objectSDK.SearchFilters, exp ...*object } } +func testCID() *container.ID { + cs := [sha256.Size]byte{} + rand.Read(cs[:]) + + id := container.NewID() + id.SetSHA256(cs) + + return id +} + +func testOID() *objectSDK.ID { + cs := [sha256.Size]byte{} + rand.Read(cs[:]) + + id := objectSDK.NewID() + id.SetSHA256(cs) + + return id +} + func TestDB(t *testing.T) { version := pkg.NewVersion() version.SetMajor(2) version.SetMinor(1) - cs := [sha256.Size]byte{} - rand.Read(cs[:]) - - cid := container.NewID() - cid.SetSHA256(cs) + cid := testCID() w, err := owner.NEO3WalletFromPublicKey(&test.DecodeKey(-1).PublicKey) require.NoError(t, err) @@ -43,9 +59,7 @@ func TestDB(t *testing.T) { ownerID := owner.NewID() ownerID.SetNeo3Wallet(w) - rand.Read(cs[:]) - oid := objectSDK.NewID() - oid.SetSHA256(cs) + oid := testOID() obj := object.NewRaw() obj.SetID(oid) @@ -100,3 +114,42 @@ func TestDB(t *testing.T) { fs.AddFilter(k, v+"1", objectSDK.MatchStringEqual) testSelect(t, db, fs) } + +func TestDB_Delete(t *testing.T) { + path := "test.db" + + bdb, err := bbolt.Open(path, 0600, nil) + require.NoError(t, err) + + defer func() { + bdb.Close() + os.Remove(path) + }() + + db := NewDB(bdb) + + obj := object.NewRaw() + obj.SetContainerID(testCID()) + obj.SetID(testOID()) + + o := obj.Object() + + require.NoError(t, db.Put(o)) + + addr := o.Address() + + _, err = db.Get(addr) + require.NoError(t, err) + + fs := objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, o.GetContainerID()) + + testSelect(t, db, fs, o.Address()) + + require.NoError(t, db.Delete(addr)) + + _, err = db.Get(addr) + require.Error(t, err) + + testSelect(t, db, fs) +} diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go new file mode 100644 index 000000000..0690b7060 --- /dev/null +++ b/pkg/local_object_storage/metabase/delete.go @@ -0,0 +1,31 @@ +package meta + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +var tombstoneBucket = []byte("tombstones") + +// Delete marks object as deleted. +func (db *DB) Delete(addr *object.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(tombstoneBucket) + if err != nil { + return errors.Wrapf(err, "(%T) could not create tombstone bucket", db) + } + + if err := bucket.Put(addressKey(addr), nil); err != nil { + return errors.Wrapf(err, "(%T) could not put to tombstone bucket", db) + } + + return nil + }) +} + +func objectRemoved(tx *bbolt.Tx, addr []byte) bool { + tombstoneBucket := tx.Bucket(tombstoneBucket) + + return tombstoneBucket != nil && tombstoneBucket.Get(addr) != nil +} diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 445c553b5..498cc2a06 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -15,12 +15,19 @@ func (db *DB) Get(addr *objectSDK.Address) (*object.Object, error) { var obj *object.Object if err := db.boltDB.View(func(tx *bbolt.Tx) error { + addrKey := addressKey(addr) + + // check if object marked as deleted + if objectRemoved(tx, addrKey) { + return errNotFound + } + primaryBucket := tx.Bucket(primaryBucket) if primaryBucket == nil { return errNotFound } - data := primaryBucket.Get(addressKey(addr)) + data := primaryBucket.Get(addrKey) if data == nil { return errNotFound } diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 227f483c4..b363671ed 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -67,6 +67,11 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { return nil } + // check if object marked as deleted + if objectRemoved(tx, k) { + return nil + } + addr := object.NewAddress() if err := addr.Parse(string(k)); err != nil { // TODO: storage was broken, so we need to handle it