package meta_test import ( "context" "errors" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) func TestDB_Delete(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close()) }() cnr := cidtest.ID() parent := testutil.GenerateObjectWithCID(cnr) testutil.AddAttribute(parent, "foo", "bar") child := testutil.GenerateObjectWithCID(cnr) child.SetParent(parent) idParent, _ := parent.ID() child.SetParentID(idParent) // put object with parent err := putBig(db, child) require.NoError(t, err) // try to remove parent, should be no-op, error-free err = metaDelete(db, object.AddressOf(parent)) require.NoError(t, err) // inhume parent and child so they will be on graveyard ts := testutil.GenerateObjectWithCID(cnr) err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts)) require.NoError(t, err) ts = testutil.GenerateObjectWithCID(cnr) err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts)) require.NoError(t, err) // delete object err = metaDelete(db, object.AddressOf(child)) require.NoError(t, err) // check if they marked as already removed ok, err := metaExists(db, object.AddressOf(child)) require.True(t, client.IsErrObjectAlreadyRemoved(err)) require.False(t, ok) ok, err = metaExists(db, object.AddressOf(parent)) require.True(t, client.IsErrObjectAlreadyRemoved(err)) require.False(t, ok) } func TestDeleteAllChildren(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close()) }() cnr := cidtest.ID() // generate parent object parent := testutil.GenerateObjectWithCID(cnr) // generate 2 children child1 := testutil.GenerateObjectWithCID(cnr) child1.SetParent(parent) idParent, _ := parent.ID() child1.SetParentID(idParent) child2 := testutil.GenerateObjectWithCID(cnr) child2.SetParent(parent) child2.SetParentID(idParent) // put children require.NoError(t, putBig(db, child1)) require.NoError(t, putBig(db, child2)) // Exists should return split info for parent _, err := metaExists(db, object.AddressOf(parent)) siErr := objectSDK.NewSplitInfoError(nil) require.True(t, errors.As(err, &siErr)) // remove all children in single call err = metaDelete(db, object.AddressOf(child1), object.AddressOf(child2)) require.NoError(t, err) // parent should not be found now ex, err := metaExists(db, object.AddressOf(parent)) require.NoError(t, err) require.False(t, ex) } func TestGraveOnlyDelete(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close()) }() addr := oidtest.Address() // inhume non-existent object by address require.NoError(t, metaInhume(db, addr, oidtest.Address())) // delete the object data require.NoError(t, metaDelete(db, addr)) } func TestExpiredObject(t *testing.T) { db := newDB(t, meta.WithEpochState(epochState{currEpoch})) defer func() { require.NoError(t, db.Close()) }() checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { // removing expired object should be error-free require.NoError(t, metaDelete(db, object.AddressOf(exp))) require.NoError(t, metaDelete(db, object.AddressOf(nonExp))) }) } func TestDelete(t *testing.T) { db := newDB(t, meta.WithEpochState(epochState{currEpoch})) defer func() { require.NoError(t, db.Close()) }() cnr := cidtest.ID() for i := 0; i < 10; i++ { obj := testutil.GenerateObjectWithCID(cnr) var prm meta.PutPrm prm.SetObject(obj) prm.SetStorageID([]byte("0/0")) _, err := db.Put(context.Background(), prm) require.NoError(t, err) var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(object.AddressOf(obj)) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) } var addrs []oid.Address var iprm meta.GarbageIterationPrm iprm.SetHandler(func(o meta.GarbageObject) error { addrs = append(addrs, o.Address()) return nil }) require.NoError(t, db.IterateOverGarbage(context.Background(), iprm)) require.Equal(t, 10, len(addrs)) var deletePrm meta.DeletePrm deletePrm.SetAddresses(addrs...) _, err := db.Delete(context.Background(), deletePrm) require.NoError(t, err) addrs = nil iprm.SetHandler(func(o meta.GarbageObject) error { addrs = append(addrs, o.Address()) return nil }) require.NoError(t, db.IterateOverGarbage(context.Background(), iprm)) require.Equal(t, 0, len(addrs)) } func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) { db := newDB(t, meta.WithEpochState(epochState{currEpoch})) defer func() { require.NoError(t, db.Close()) }() addr := oidtest.Address() var prm meta.InhumePrm prm.SetAddresses(addr) prm.SetGCMark() _, err := db.Inhume(context.Background(), prm) require.NoError(t, err) var garbageCount int var itPrm meta.GarbageIterationPrm itPrm.SetHandler(func(g meta.GarbageObject) error { garbageCount++ return nil }) require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) require.Equal(t, 1, garbageCount) var delPrm meta.DeletePrm delPrm.SetAddresses(addr) _, err = db.Delete(context.Background(), delPrm) require.NoError(t, err) garbageCount = 0 require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) require.Equal(t, 0, garbageCount) } func metaDelete(db *meta.DB, addrs ...oid.Address) error { var deletePrm meta.DeletePrm deletePrm.SetAddresses(addrs...) _, err := db.Delete(context.Background(), deletePrm) return err }