diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 1b93d0dd..bc12eaea 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -34,21 +34,46 @@ func Delete(db *DB, addrs ...*objectSDK.Address) error { return err } +type referenceNumber struct { + all, cur int + + addr *objectSDK.Address + + obj *object.Object +} + +type referenceCounter map[string]*referenceNumber + // Delete removed object records from metabase indexes. func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { - for i := range prm.addrs { - err := db.delete(tx, prm.addrs[i], false) + return db.deleteGroup(tx, prm.addrs) + }) +} + +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error { + refCounter := make(referenceCounter, len(addrs)) + + for i := range addrs { + err := db.delete(tx, addrs[i], refCounter) + if err != nil { + return err // maybe log and continue? + } + } + + for _, refNum := range refCounter { + if refNum.cur == refNum.all { + err := db.deleteObject(tx, refNum.obj, true) if err != nil { return err // maybe log and continue? } } + } - return nil - }) + return nil } -func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { +func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, refCounter referenceCounter) error { // unmarshal object, work only with physically stored (raw == true) objects obj, err := db.get(tx, addr, false, true) if err != nil { @@ -57,16 +82,25 @@ func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error // if object is an only link to a parent, then remove parent if parent := obj.GetParent(); parent != nil { - if parentLength(tx, parent.Address()) == 1 { - err = db.deleteObject(tx, obj.GetParent(), true) - if err != nil { - return err + parAddr := parent.Address() + sParAddr := parAddr.String() + + nRef, ok := refCounter[sParAddr] + if !ok { + nRef = &referenceNumber{ + all: parentLength(tx, parent.Address()), + addr: parAddr, + obj: parent, } + + refCounter[sParAddr] = nRef } + + nRef.cur++ } // remove object - return db.deleteObject(tx, obj, isParent) + return db.deleteObject(tx, obj, false) } func (db *DB) deleteObject( @@ -207,6 +241,11 @@ func delUniqueIndexes(obj *object.Object, isParent bool) ([]namedBucketItem, err name: bucketName, key: objKey, }) + } else { + result = append(result, namedBucketItem{ + name: parentBucketName(obj.ContainerID()), + key: objKey, + }) } result = append(result, diff --git a/pkg/local_object_storage/metabase/delete_test.go b/pkg/local_object_storage/metabase/delete_test.go index c9c3e112..25cde6b9 100644 --- a/pkg/local_object_storage/metabase/delete_test.go +++ b/pkg/local_object_storage/metabase/delete_test.go @@ -3,7 +3,9 @@ package meta_test import ( "testing" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -63,3 +65,40 @@ func TestDB_Delete(t *testing.T) { require.NoError(t, err) require.False(t, ok) } + +func TestDeleteAllChildren(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + // generate parent object + parent := generateRawObjectWithCID(t, cid) + + // generate 2 children + child1 := generateRawObjectWithCID(t, cid) + child1.SetParent(parent.Object().SDK()) + child1.SetParentID(parent.ID()) + + child2 := generateRawObjectWithCID(t, cid) + child2.SetParent(parent.Object().SDK()) + child2.SetParentID(parent.ID()) + + // put children + require.NoError(t, putBig(db, child1.Object())) + require.NoError(t, putBig(db, child2.Object())) + + // Exists should return split info for parent + _, err := meta.Exists(db, parent.Object().Address()) + siErr := objectSDK.NewSplitInfoError(nil) + require.True(t, errors.As(err, &siErr)) + + // remove all children in single call + err = meta.Delete(db, child1.Object().Address(), child2.Object().Address()) + require.NoError(t, err) + + // parent should not be found now + ex, err := meta.Exists(db, parent.Object().Address()) + require.NoError(t, err) + require.False(t, ex) +}