package meta_test import ( "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() var counter int var iterGravePRM meta.GraveyardIterationPrm iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error { counter++ return nil }) err := db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Zero(t, counter) var iterGCPRM meta.GarbageIterationPrm iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { counter++ return nil }) err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) require.Zero(t, counter) } func TestDB_Iterate_OffsetNotFound(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() obj1 := testutil.GenerateObject() obj2 := testutil.GenerateObject() var addr1 oid.Address err := addr1.DecodeString("AUSF6rhReoAdPVKYUZWW9o2LbtTvekn54B3JXi7pdzmn/2daLhLB7yVXbjBaKkckkuvjX22BxRYuSHy9RPxuH9PZS") require.NoError(t, err) var addr2 oid.Address err = addr2.DecodeString("CwYYr6sFLU1zK6DeBTVd8SReADUoxYobUhSrxgXYxCVn/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM") require.NoError(t, err) var addr3 oid.Address err = addr3.DecodeString("6ay4GfhR9RgN28d5ufg63toPetkYHGcpcW7G3b7QWSek/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM") require.NoError(t, err) obj1.SetContainerID(addr1.Container()) obj1.SetID(addr1.Object()) obj2.SetContainerID(addr2.Container()) obj2.SetID(addr2.Object()) err = putBig(db, obj1) require.NoError(t, err) var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(object.AddressOf(obj1)) inhumePrm.SetGCMark() _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) var counter int var iterGCPRM meta.GarbageIterationPrm iterGCPRM.SetOffset(object.AddressOf(obj2)) iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { require.Equal(t, garbage.Address(), addr1) counter++ return nil }) err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // the second object would be put after the // first, so it is expected that iteration // will not receive the first object require.Equal(t, 0, counter) iterGCPRM.SetOffset(addr3) iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { require.Equal(t, garbage.Address(), addr1) counter++ return nil }) err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // the third object would be put before the // first, so it is expected that iteration // will receive the first object require.Equal(t, 1, counter) } func TestDB_IterateDeletedObjects(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() cnr := cidtest.ID() // generate and put 4 objects obj1 := testutil.GenerateObjectWithCID(cnr) obj2 := testutil.GenerateObjectWithCID(cnr) obj3 := testutil.GenerateObjectWithCID(cnr) obj4 := testutil.GenerateObjectWithCID(cnr) var err error err = putBig(db, obj1) require.NoError(t, err) err = putBig(db, obj2) require.NoError(t, err) err = putBig(db, obj3) require.NoError(t, err) err = putBig(db, obj4) require.NoError(t, err) var inhumePrm meta.InhumePrm // inhume with tombstone addrTombstone := oidtest.Address() addrTombstone.SetContainer(cnr) inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetTombstoneAddress(addrTombstone) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) inhumePrm.SetAddresses(object.AddressOf(obj3), object.AddressOf(obj4)) inhumePrm.SetGCMark() // inhume with GC mark _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) var ( counterAll int buriedTS, buriedGC []oid.Address ) var iterGravePRM meta.GraveyardIterationPrm iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error { require.Equal(t, addrTombstone, tomstoned.Tombstone()) buriedTS = append(buriedTS, tomstoned.Address()) counterAll++ return nil }) err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) var iterGCPRM meta.GarbageIterationPrm iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { buriedGC = append(buriedGC, garbage.Address()) counterAll++ return nil }) err = db.IterateOverGarbage(context.Background(), iterGCPRM) require.NoError(t, err) // objects covered with a tombstone // also receive GS mark garbageExpected := []oid.Address{ object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4), } graveyardExpected := []oid.Address{ object.AddressOf(obj1), object.AddressOf(obj2), } require.Equal(t, len(garbageExpected)+len(graveyardExpected), counterAll) require.ElementsMatch(t, graveyardExpected, buriedTS) require.ElementsMatch(t, garbageExpected, buriedGC) } func TestDB_IterateOverGraveyard_Offset(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() cnr := cidtest.ID() // generate and put 4 objects obj1 := testutil.GenerateObjectWithCID(cnr) obj2 := testutil.GenerateObjectWithCID(cnr) obj3 := testutil.GenerateObjectWithCID(cnr) obj4 := testutil.GenerateObjectWithCID(cnr) var err error err = putBig(db, obj1) require.NoError(t, err) err = putBig(db, obj2) require.NoError(t, err) err = putBig(db, obj3) require.NoError(t, err) err = putBig(db, obj4) require.NoError(t, err) // inhume with tombstone addrTombstone := oidtest.Address() addrTombstone.SetContainer(cnr) var inhumePrm meta.InhumePrm inhumePrm.SetAddresses( object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)) inhumePrm.SetTombstoneAddress(addrTombstone) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) expectedGraveyard := []oid.Address{ object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4), } var ( counter int firstIterationSize = len(expectedGraveyard) / 2 gotGraveyard []oid.Address ) var iterGraveyardPrm meta.GraveyardIterationPrm iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { require.Equal(t, addrTombstone, tombstoned.Tombstone()) gotGraveyard = append(gotGraveyard, tombstoned.Address()) counter++ if counter == firstIterationSize { return meta.ErrInterruptIterator } return nil }) err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.Equal(t, firstIterationSize, counter) require.Equal(t, firstIterationSize, len(gotGraveyard)) // last received address is an offset offset := gotGraveyard[len(gotGraveyard)-1] iterGraveyardPrm.SetOffset(offset) iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { require.Equal(t, addrTombstone, tombstoned.Tombstone()) gotGraveyard = append(gotGraveyard, tombstoned.Address()) counter++ return nil }) err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.Equal(t, len(expectedGraveyard), counter) require.ElementsMatch(t, gotGraveyard, expectedGraveyard) // last received object (last in db) as offset // should lead to no iteration at all offset = gotGraveyard[len(gotGraveyard)-1] iterGraveyardPrm.SetOffset(offset) iWasCalled := false iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { iWasCalled = true return nil }) err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm) require.NoError(t, err) require.False(t, iWasCalled) } func TestDB_IterateOverGarbage_Offset(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() // generate and put 4 objects obj1 := testutil.GenerateObject() obj2 := testutil.GenerateObject() obj3 := testutil.GenerateObject() obj4 := testutil.GenerateObject() var err error err = putBig(db, obj1) require.NoError(t, err) err = putBig(db, obj2) require.NoError(t, err) err = putBig(db, obj3) require.NoError(t, err) err = putBig(db, obj4) require.NoError(t, err) var inhumePrm meta.InhumePrm inhumePrm.SetAddresses( object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)) inhumePrm.SetGCMark() _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) expectedGarbage := []oid.Address{ object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4), } var ( counter int firstIterationSize = len(expectedGarbage) / 2 gotGarbage []oid.Address ) var iterGarbagePrm meta.GarbageIterationPrm iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { gotGarbage = append(gotGarbage, garbage.Address()) counter++ if counter == firstIterationSize { return meta.ErrInterruptIterator } return nil }) err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.Equal(t, firstIterationSize, counter) require.Equal(t, firstIterationSize, len(gotGarbage)) // last received address is an offset offset := gotGarbage[len(gotGarbage)-1] iterGarbagePrm.SetOffset(offset) iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { gotGarbage = append(gotGarbage, garbage.Address()) counter++ return nil }) err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.Equal(t, len(expectedGarbage), counter) require.ElementsMatch(t, gotGarbage, expectedGarbage) // last received object (last in db) as offset // should lead to no iteration at all offset = gotGarbage[len(gotGarbage)-1] iterGarbagePrm.SetOffset(offset) iWasCalled := false iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { iWasCalled = true return nil }) err = db.IterateOverGarbage(context.Background(), iterGarbagePrm) require.NoError(t, err) require.False(t, iWasCalled) } func TestDB_InhumeTombstones(t *testing.T) { db := newDB(t) defer func() { require.NoError(t, db.Close(context.Background())) }() cnr := cidtest.ID() // generate and put 2 objects obj1 := testutil.GenerateObjectWithCID(cnr) obj2 := testutil.GenerateObjectWithCID(cnr) var err error err = putBig(db, obj1) require.NoError(t, err) err = putBig(db, obj2) require.NoError(t, err) id1, _ := obj1.ID() id2, _ := obj2.ID() ts := objectSDK.NewTombstone() ts.SetMembers([]oid.ID{id1, id2}) objTs := objectSDK.New() objTs.SetContainerID(cnr) objTs.SetType(objectSDK.TypeTombstone) data, _ := ts.Marshal() objTs.SetPayload(data) require.NoError(t, objectSDK.CalculateAndSetID(objTs)) require.NoError(t, putBig(db, objTs)) addrTombstone := object.AddressOf(objTs) var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetTombstoneAddress(addrTombstone) _, err = db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) buriedTS := make([]meta.TombstonedObject, 0) var iterGravePRM meta.GraveyardIterationPrm var counter int iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error { buriedTS = append(buriedTS, tomstoned) counter++ return nil }) err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Equal(t, 2, counter) res, err := db.InhumeTombstones(context.Background(), buriedTS) require.NoError(t, err) require.EqualValues(t, 1, res.LogicInhumed()) require.EqualValues(t, 0, res.UserInhumed()) require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID()) counter = 0 iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error { counter++ return nil }) err = db.IterateOverGraveyard(context.Background(), iterGravePRM) require.NoError(t, err) require.Zero(t, counter) }