package engine import ( "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) func TestDeleteBigObject(t *testing.T) { t.Parallel() cnr := cidtest.ID() parentID := oidtest.ID() splitID := objectSDK.NewSplitID() parent := testutil.GenerateObjectWithCID(cnr) parent.SetID(parentID) parent.SetPayload(nil) const childCount = 10 children := make([]*objectSDK.Object, childCount) childIDs := make([]oid.ID, childCount) for i := range children { children[i] = testutil.GenerateObjectWithCID(cnr) if i != 0 { children[i].SetPreviousID(childIDs[i-1]) } if i == len(children)-1 { children[i].SetParent(parent) } children[i].SetSplitID(splitID) children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) childIDs[i], _ = children[i].ID() } link := testutil.GenerateObjectWithCID(cnr) link.SetParent(parent) link.SetParentID(parentID) link.SetSplitID(splitID) link.SetChildren(childIDs...) s1 := testNewShard(t, 1) s2 := testNewShard(t, 2) s3 := testNewShard(t, 3) e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine e.log = test.NewLogger(t) defer e.Close(context.Background()) for i := range children { require.NoError(t, Put(context.Background(), e, children[i])) } require.NoError(t, Put(context.Background(), e, link)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) addrLink := object.AddressOf(link) checkGetError[error](t, e, addrLink, false) for i := range children { checkGetError[error](t, e, object.AddressOf(children[i]), false) } var deletePrm DeletePrm deletePrm.WithForceRemoval() deletePrm.WithAddress(addrParent) _, err := e.Delete(context.Background(), deletePrm) require.NoError(t, err) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) for i := range children { checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true) } } func TestDeleteBigObjectWithoutGC(t *testing.T) { t.Parallel() cnr := cidtest.ID() parentID := oidtest.ID() splitID := objectSDK.NewSplitID() parent := testutil.GenerateObjectWithCID(cnr) parent.SetID(parentID) parent.SetPayload(nil) const childCount = 3 children := make([]*objectSDK.Object, childCount) childIDs := make([]oid.ID, childCount) for i := range children { children[i] = testutil.GenerateObjectWithCID(cnr) if i != 0 { children[i].SetPreviousID(childIDs[i-1]) } if i == len(children)-1 { children[i].SetParent(parent) } children[i].SetSplitID(splitID) children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) childIDs[i], _ = children[i].ID() } link := testutil.GenerateObjectWithCID(cnr) link.SetParent(parent) link.SetParentID(parentID) link.SetSplitID(splitID) link.SetChildren(childIDs...) s1 := testNewShard(t, 1, shard.WithDisabledGC()) e := testNewEngine(t).setInitializedShards(t, s1).engine e.log = test.NewLogger(t) defer e.Close(context.Background()) for i := range children { require.NoError(t, Put(context.Background(), e, children[i])) } require.NoError(t, Put(context.Background(), e, link)) addrParent := object.AddressOf(parent) checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) addrLink := object.AddressOf(link) checkGetError[error](t, e, addrLink, false) for i := range children { checkGetError[error](t, e, object.AddressOf(children[i]), false) } // delete logical var deletePrm DeletePrm deletePrm.WithForceRemoval() deletePrm.WithAddress(addrParent) _, err := e.Delete(context.Background(), deletePrm) require.NoError(t, err) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) for i := range children { checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true) } // delete physical var delPrm shard.DeletePrm delPrm.SetAddresses(addrParent) _, err = s1.Delete(context.Background(), delPrm) require.NoError(t, err) delPrm.SetAddresses(addrLink) _, err = s1.Delete(context.Background(), delPrm) require.NoError(t, err) for i := range children { delPrm.SetAddresses(object.AddressOf(children[i])) _, err = s1.Delete(context.Background(), delPrm) require.NoError(t, err) } checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) for i := range children { checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true) } } func checkGetError[E error](t *testing.T, e *StorageEngine, addr oid.Address, shouldFail bool) { var getPrm GetPrm getPrm.WithAddress(addr) _, err := e.Get(context.Background(), getPrm) if shouldFail { var target E require.ErrorAs(t, err, &target) } else { require.NoError(t, err) } }