From 7fb87aac851de607be33dd23953d842e4a68d75f Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 27 Nov 2020 20:49:03 +0300 Subject: [PATCH] [#199] Support Delete operation in metabase Signed-off-by: Alex Vanin --- .../metabase/v2/delete.go | 217 ++++++++++++++++++ .../metabase/v2/delete_test.go | 60 +++++ 2 files changed, 277 insertions(+) create mode 100644 pkg/local_object_storage/metabase/v2/delete.go create mode 100644 pkg/local_object_storage/metabase/v2/delete_test.go diff --git a/pkg/local_object_storage/metabase/v2/delete.go b/pkg/local_object_storage/metabase/v2/delete.go new file mode 100644 index 000000000..913f0489f --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/delete.go @@ -0,0 +1,217 @@ +package meta + +import ( + "bytes" + "errors" + "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "go.etcd.io/bbolt" +) + +var ErrVirtualObject = errors.New("do not remove virtual object directly") + +// DeleteObjects marks list of objects as deleted. +func (db *DB) Delete(lst ...*objectSDK.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + for i := range lst { + err := db.delete(tx, lst[i], false) + if err != nil { + return err // maybe log and continue? + } + } + + return nil + }) +} + +func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { + pl := parentLength(tx, addr) // parentLength of address, for virtual objects it is > 0 + + // do not remove virtual objects directly + if !isParent && pl > 0 { + return ErrVirtualObject + } + + // unmarshal object + obj, err := db.get(tx, addr, false) + if err != nil { + return err + } + + // if object is an only link to a parent, then remove parent + if parent := obj.GetParent(); parent != nil { + if parentLength(tx, parent.Address()) == 1 { + err = db.deleteObject(tx, obj.GetParent(), true) + if err != nil { + return err + } + } + } + + // remove object + return db.deleteObject(tx, obj, isParent) +} + +func (db *DB) deleteObject( + tx *bbolt.Tx, + obj *object.Object, + isParent bool, +) error { + uniqueIndexes, err := delUniqueIndexes(obj, isParent) + if err != nil { + return fmt.Errorf("can' build unique indexes: %w", err) + } + + // delete unique indexes + for i := range uniqueIndexes { + delUniqueIndexItem(tx, uniqueIndexes[i]) + } + + // build list indexes + listIndexes, err := listIndexes(obj) + if err != nil { + return fmt.Errorf("can' build list indexes: %w", err) + } + + // delete list indexes + for i := range listIndexes { + delListIndexItem(tx, listIndexes[i]) + } + + // build fake bucket tree indexes + fkbtIndexes, err := fkbtIndexes(obj) + if err != nil { + return fmt.Errorf("can' build fake bucket tree indexes: %w", err) + } + + // delete fkbt indexes + for i := range fkbtIndexes { + delFKBTIndexItem(tx, fkbtIndexes[i]) + } + + return nil +} + +// parentLength returns amount of available children from parentid index. +func parentLength(tx *bbolt.Tx, addr *objectSDK.Address) int { + bkt := tx.Bucket(parentBucketName(addr.ContainerID())) + if bkt == nil { + return 0 + } + + lst, err := decodeList(bkt.Get(objectKey(addr.ObjectID()))) + if err != nil { + return 0 + } + + return len(lst) +} + +func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) { + bkt := tx.Bucket(item.name) + if bkt != nil { + _ = bkt.Delete(item.key) // ignore error, best effort there + } +} + +func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) { + bkt := tx.Bucket(item.name) + if bkt == nil { + return + } + + fkbtRoot := bkt.Bucket(item.key) + if fkbtRoot == nil { + return + } + + _ = fkbtRoot.Delete(item.val) // ignore error, best effort there +} + +func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) { + bkt := tx.Bucket(item.name) + if bkt == nil { + return + } + + lst, err := decodeList(bkt.Get(item.key)) + if err != nil || len(lst) == 0 { + return + } + + // remove element from the list + newLst := make([][]byte, 0, len(lst)) + + for i := range lst { + if !bytes.Equal(item.val, lst[i]) { + newLst = append(newLst, lst[i]) + } + } + + // if list empty, remove the key from bucket + if len(newLst) == 0 { + _ = bkt.Delete(item.key) // ignore error, best effort there + + return + } + + // if list is not empty, then update it + encodedLst, err := encodeList(lst) + if err != nil { + return // ignore error, best effort there + } + + _ = bkt.Put(item.key, encodedLst) // ignore error, best effort there +} + +func delUniqueIndexes(obj *object.Object, isParent bool) ([]namedBucketItem, error) { + addr := obj.Address() + objKey := objectKey(addr.ObjectID()) + addrKey := addressKey(addr) + + result := make([]namedBucketItem, 0, 5) + + // add value to primary unique bucket + if !isParent { + var bucketName []byte + + switch obj.Type() { + case objectSDK.TypeRegular: + bucketName = primaryBucketName(addr.ContainerID()) + case objectSDK.TypeTombstone: + bucketName = tombstoneBucketName(addr.ContainerID()) + case objectSDK.TypeStorageGroup: + bucketName = storageGroupBucketName(addr.ContainerID()) + default: + return nil, ErrUnknownObjectType + } + + result = append(result, namedBucketItem{ + name: bucketName, + key: objKey, + }) + } + + result = append(result, + namedBucketItem{ // remove from small blobovnicza id index + name: smallBucketName(addr.ContainerID()), + key: objKey, + }, + namedBucketItem{ // remove from root index + name: rootBucketName(addr.ContainerID()), + key: objKey, + }, + namedBucketItem{ // remove from graveyard index + name: graveyardBucketName, + key: addrKey, + }, + namedBucketItem{ // remove from ToMoveIt index + name: toMoveItBucketName, + key: addrKey, + }, + ) + + return result, nil +} diff --git a/pkg/local_object_storage/metabase/v2/delete_test.go b/pkg/local_object_storage/metabase/v2/delete_test.go new file mode 100644 index 000000000..c056a9b3e --- /dev/null +++ b/pkg/local_object_storage/metabase/v2/delete_test.go @@ -0,0 +1,60 @@ +package meta_test + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDB_Delete(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + parent := generateRawObjectWithCID(t, cid) + addAttribute(parent, "foo", "bar") + + child := generateRawObjectWithCID(t, cid) + child.SetParent(parent.Object().SDK()) + child.SetParentID(parent.ID()) + + // put object with parent + err := db.Put(child.Object(), nil) + require.NoError(t, err) + + // fill ToMoveIt index + err = db.ToMoveIt(child.Object().Address()) + require.NoError(t, err) + + // check if Movable list is not empty + l, err := db.Movable() + require.NoError(t, err) + require.Len(t, l, 1) + + // inhume parent and child so they will be on graveyard + ts := generateRawObjectWithCID(t, cid) + + err = db.Inhume(child.Object().Address(), ts.Object().Address()) + require.NoError(t, err) + + err = db.Inhume(child.Object().Address(), ts.Object().Address()) + require.NoError(t, err) + + // delete object + err = db.Delete(child.Object().Address()) + require.NoError(t, err) + + // check if there is no data in Movable index + l, err = db.Movable() + require.NoError(t, err) + require.Len(t, l, 0) + + // check if they removed from graveyard + ok, err := db.Exists(child.Object().Address()) + require.NoError(t, err) + require.False(t, ok) + + ok, err = db.Exists(parent.Object().Address()) + require.NoError(t, err) + require.False(t, ok) +}