diff --git a/CHANGELOG.md b/CHANGELOG.md index 37079902bc..51dabb1d19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Changelog for NeoFS Node - Missing check of new state value in `ControlService.SetNetmapStatus` (#1797) - Correlation of object session to request (#1420) - Do not increase error counter in `engine.Inhume` if shard is read-only (#1839) +- `control drop-objects` can remove split objects (#1830) ### Removed - Remove WIF and NEP2 support in `neofs-cli`'s --wallet flag (#1128) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index c73ed798eb..fe0cc46034 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -62,21 +62,29 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { is bool err apistatus.ObjectLocked } + var splitInfo *objectSDK.SplitInfo + // Removal of a big object is done in multiple stages: + // 1. Remove the parent object. If it is locked or already removed, return immediately. + // 2. Otherwise, search for all objects with a particular SplitID and delete them too. e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { var existsPrm shard.ExistsPrm existsPrm.SetAddress(prm.addr) resExists, err := sh.Exists(existsPrm) if err != nil { - _, ok := err.(*objectSDK.SplitInfoError) - if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { + if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { return true } - if !shard.IsErrNotFound(err) { - e.reportShardError(sh, "could not check object existence", err) + + splitErr, ok := err.(*objectSDK.SplitInfoError) + if !ok { + if !shard.IsErrNotFound(err) { + e.reportShardError(sh, "could not check object existence", err) + } + return false } - return false + splitInfo = splitErr.SplitInfo() } else if !resExists.Exists() { return false } @@ -102,12 +110,54 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { return locked.is } - return true + // If a parent object is removed we should set GC mark on each shard. + return splitInfo == nil }) if locked.is { return DeleteRes{}, locked.err } + if splitInfo != nil { + e.deleteChildren(prm.addr, prm.forceRemoval, splitInfo.SplitID()) + } + return DeleteRes{}, nil } + +func (e *StorageEngine) deleteChildren(addr oid.Address, force bool, splitID *objectSDK.SplitID) { + var fs objectSDK.SearchFilters + fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID) + + var selectPrm shard.SelectPrm + selectPrm.SetFilters(fs) + selectPrm.SetContainerID(addr.Container()) + + var inhumePrm shard.InhumePrm + if force { + inhumePrm.ForceRemoval() + } + + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + res, err := sh.Select(selectPrm) + if err != nil { + e.log.Warn("error during searching for object children", + zap.Stringer("addr", addr), + zap.String("error", err.Error())) + return false + } + + for _, addr := range res.AddressList() { + inhumePrm.MarkAsGarbage(addr) + + _, err = sh.Inhume(inhumePrm) + if err != nil { + e.log.Debug("could not inhume object in shard", + zap.Stringer("addr", addr), + zap.String("err", err.Error())) + continue + } + } + return false + }) +} diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go new file mode 100644 index 0000000000..0fde179d0d --- /dev/null +++ b/pkg/local_object_storage/engine/delete_test.go @@ -0,0 +1,99 @@ +package engine + +import ( + "os" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestDeleteBigObject(t *testing.T) { + defer os.RemoveAll(t.Name()) + + cnr := cidtest.ID() + parentID := oidtest.ID() + splitID := objectSDK.NewSplitID() + + parent := generateObjectWithCID(t, cnr) + parent.SetID(parentID) + parent.SetPayload(nil) + + const childCount = 10 + children := make([]*objectSDK.Object, childCount) + childIDs := make([]oid.ID, childCount) + for i := range children { + children[i] = generateObjectWithCID(t, cnr) + if i != 0 { + children[i].SetPreviousID(childIDs[i-1]) + } + if i == len(children)-1 { + children[i].SetParent(parent) + } + children[i].SetSplitID(splitID) + children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) + childIDs[i], _ = children[i].ID() + } + + link := generateObjectWithCID(t, cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetSplitID(splitID) + link.SetChildren(childIDs...) + + s1 := testNewShard(t, 1) + s2 := testNewShard(t, 2) + s3 := testNewShard(t, 3) + + e := testNewEngineWithShards(s1, s2, s3) + e.log = zaptest.NewLogger(t) + defer e.Close() + + for i := range children { + require.NoError(t, Put(e, children[i])) + } + require.NoError(t, Put(e, link)) + + var splitErr *objectSDK.SplitInfoError + + addrParent := object.AddressOf(parent) + checkGetError(t, e, addrParent, &splitErr) + + addrLink := object.AddressOf(link) + checkGetError(t, e, addrLink, nil) + + for i := range children { + checkGetError(t, e, object.AddressOf(children[i]), nil) + } + + var deletePrm DeletePrm + deletePrm.WithForceRemoval() + deletePrm.WithAddress(addrParent) + + _, err := e.Delete(deletePrm) + require.NoError(t, err) + + checkGetError(t, e, addrParent, &apistatus.ObjectNotFound{}) + checkGetError(t, e, addrLink, &apistatus.ObjectNotFound{}) + for i := range children { + checkGetError(t, e, object.AddressOf(children[i]), &apistatus.ObjectNotFound{}) + } +} + +func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected interface{}) { + var getPrm GetPrm + getPrm.WithAddress(addr) + + _, err := e.Get(getPrm) + if expected != nil { + require.ErrorAs(t, err, expected) + } else { + require.NoError(t, err) + } +}