From 7c3f828893e6575824ad876375a07b0f489655c4 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 11 Feb 2021 20:39:48 +0300 Subject: [PATCH] [#219] metabase: Count parent references in Delete operation Delete operation of Metabase is performed on group of objects. The set being removed can contain descendants of a common parent. In the case when all descendants of a parent object are deleted, it must also be deleted from the metabase. In the previous implementation, this was not done due to the chosen approach to counting references to the parent. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/delete.go | 59 +++++++++++++++---- .../metabase/delete_test.go | 39 ++++++++++++ 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 1b93d0dd..bc12eaea 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -34,21 +34,46 @@ func Delete(db *DB, addrs ...*objectSDK.Address) error { return err } +type referenceNumber struct { + all, cur int + + addr *objectSDK.Address + + obj *object.Object +} + +type referenceCounter map[string]*referenceNumber + // Delete removed object records from metabase indexes. func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { - for i := range prm.addrs { - err := db.delete(tx, prm.addrs[i], false) + return db.deleteGroup(tx, prm.addrs) + }) +} + +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error { + refCounter := make(referenceCounter, len(addrs)) + + for i := range addrs { + err := db.delete(tx, addrs[i], refCounter) + if err != nil { + return err // maybe log and continue? + } + } + + for _, refNum := range refCounter { + if refNum.cur == refNum.all { + err := db.deleteObject(tx, refNum.obj, true) if err != nil { return err // maybe log and continue? } } + } - return nil - }) + return nil } -func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { +func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, refCounter referenceCounter) error { // unmarshal object, work only with physically stored (raw == true) objects obj, err := db.get(tx, addr, false, true) if err != nil { @@ -57,16 +82,25 @@ func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error // if object is an only link to a parent, then remove parent if parent := obj.GetParent(); parent != nil { - if parentLength(tx, parent.Address()) == 1 { - err = db.deleteObject(tx, obj.GetParent(), true) - if err != nil { - return err + parAddr := parent.Address() + sParAddr := parAddr.String() + + nRef, ok := refCounter[sParAddr] + if !ok { + nRef = &referenceNumber{ + all: parentLength(tx, parent.Address()), + addr: parAddr, + obj: parent, } + + refCounter[sParAddr] = nRef } + + nRef.cur++ } // remove object - return db.deleteObject(tx, obj, isParent) + return db.deleteObject(tx, obj, false) } func (db *DB) deleteObject( @@ -207,6 +241,11 @@ func delUniqueIndexes(obj *object.Object, isParent bool) ([]namedBucketItem, err name: bucketName, key: objKey, }) + } else { + result = append(result, namedBucketItem{ + name: parentBucketName(obj.ContainerID()), + key: objKey, + }) } result = append(result, diff --git a/pkg/local_object_storage/metabase/delete_test.go b/pkg/local_object_storage/metabase/delete_test.go index c9c3e112..25cde6b9 100644 --- a/pkg/local_object_storage/metabase/delete_test.go +++ b/pkg/local_object_storage/metabase/delete_test.go @@ -3,7 +3,9 @@ package meta_test import ( "testing" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -63,3 +65,40 @@ func TestDB_Delete(t *testing.T) { require.NoError(t, err) require.False(t, ok) } + +func TestDeleteAllChildren(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + // generate parent object + parent := generateRawObjectWithCID(t, cid) + + // generate 2 children + child1 := generateRawObjectWithCID(t, cid) + child1.SetParent(parent.Object().SDK()) + child1.SetParentID(parent.ID()) + + child2 := generateRawObjectWithCID(t, cid) + child2.SetParent(parent.Object().SDK()) + child2.SetParentID(parent.ID()) + + // put children + require.NoError(t, putBig(db, child1.Object())) + require.NoError(t, putBig(db, child2.Object())) + + // Exists should return split info for parent + _, err := meta.Exists(db, parent.Object().Address()) + siErr := objectSDK.NewSplitInfoError(nil) + require.True(t, errors.As(err, &siErr)) + + // remove all children in single call + err = meta.Delete(db, child1.Object().Address(), child2.Object().Address()) + require.NoError(t, err) + + // parent should not be found now + ex, err := meta.Exists(db, parent.Object().Address()) + require.NoError(t, err) + require.False(t, ex) +}