[#219] metabase: Count parent references in Delete operation

Delete operation of Metabase is performed on group of objects. The set being
removed can contain descendants of a common parent. In the case when all
descendants of a parent object are deleted, it must also be deleted from
the metabase. In the previous implementation, this was not done due to the
chosen approach to counting references to the parent.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-02-11 20:39:48 +03:00 committed by Alex Vanin
parent 38727c2930
commit 7c3f828893
2 changed files with 88 additions and 10 deletions

View file

@ -34,21 +34,46 @@ func Delete(db *DB, addrs ...*objectSDK.Address) error {
return err return err
} }
type referenceNumber struct {
all, cur int
addr *objectSDK.Address
obj *object.Object
}
type referenceCounter map[string]*referenceNumber
// Delete removed object records from metabase indexes. // Delete removed object records from metabase indexes.
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error {
for i := range prm.addrs { return db.deleteGroup(tx, prm.addrs)
err := db.delete(tx, prm.addrs[i], false) })
}
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error {
refCounter := make(referenceCounter, len(addrs))
for i := range addrs {
err := db.delete(tx, addrs[i], refCounter)
if err != nil { if err != nil {
return err // maybe log and continue? return err // maybe log and continue?
} }
} }
return nil for _, refNum := range refCounter {
}) if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return err // maybe log and continue?
}
}
} }
func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { return nil
}
func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, refCounter referenceCounter) error {
// unmarshal object, work only with physically stored (raw == true) objects // unmarshal object, work only with physically stored (raw == true) objects
obj, err := db.get(tx, addr, false, true) obj, err := db.get(tx, addr, false, true)
if err != nil { if err != nil {
@ -57,16 +82,25 @@ func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error
// if object is an only link to a parent, then remove parent // if object is an only link to a parent, then remove parent
if parent := obj.GetParent(); parent != nil { if parent := obj.GetParent(); parent != nil {
if parentLength(tx, parent.Address()) == 1 { parAddr := parent.Address()
err = db.deleteObject(tx, obj.GetParent(), true) sParAddr := parAddr.String()
if err != nil {
return err nRef, ok := refCounter[sParAddr]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parent.Address()),
addr: parAddr,
obj: parent,
} }
refCounter[sParAddr] = nRef
} }
nRef.cur++
} }
// remove object // remove object
return db.deleteObject(tx, obj, isParent) return db.deleteObject(tx, obj, false)
} }
func (db *DB) deleteObject( func (db *DB) deleteObject(
@ -207,6 +241,11 @@ func delUniqueIndexes(obj *object.Object, isParent bool) ([]namedBucketItem, err
name: bucketName, name: bucketName,
key: objKey, key: objKey,
}) })
} else {
result = append(result, namedBucketItem{
name: parentBucketName(obj.ContainerID()),
key: objKey,
})
} }
result = append(result, result = append(result,

View file

@ -3,7 +3,9 @@ package meta_test
import ( import (
"testing" "testing"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -63,3 +65,40 @@ func TestDB_Delete(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, ok) require.False(t, ok)
} }
func TestDeleteAllChildren(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := testCID()
// generate parent object
parent := generateRawObjectWithCID(t, cid)
// generate 2 children
child1 := generateRawObjectWithCID(t, cid)
child1.SetParent(parent.Object().SDK())
child1.SetParentID(parent.ID())
child2 := generateRawObjectWithCID(t, cid)
child2.SetParent(parent.Object().SDK())
child2.SetParentID(parent.ID())
// put children
require.NoError(t, putBig(db, child1.Object()))
require.NoError(t, putBig(db, child2.Object()))
// Exists should return split info for parent
_, err := meta.Exists(db, parent.Object().Address())
siErr := objectSDK.NewSplitInfoError(nil)
require.True(t, errors.As(err, &siErr))
// remove all children in single call
err = meta.Delete(db, child1.Object().Address(), child2.Object().Address())
require.NoError(t, err)
// parent should not be found now
ex, err := meta.Exists(db, parent.Object().Address())
require.NoError(t, err)
require.False(t, ex)
}