frostfs-node/pkg/local_object_storage/metabase/graveyard_test.go
Dmitrii Stepanov a52a0f4204 [#1385] metabase: Validate that tombstone and target have the same container ID
Target container ID is taken from tombstone: cmd/frostfs-node/object.go:507
Also object of type `TOMBSTONE` contains objectID, so tombstone and
tombstoned object must have the same containerID.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-20 15:18:47 +03:00

450 lines
12 KiB
Go

package meta_test
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
var counter int
var iterGravePRM meta.GraveyardIterationPrm
iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error {
counter++
return nil
})
err := db.IterateOverGraveyard(context.Background(), iterGravePRM)
require.NoError(t, err)
require.Zero(t, counter)
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
counter++
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGCPRM)
require.NoError(t, err)
require.Zero(t, counter)
}
func TestDB_Iterate_OffsetNotFound(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
obj1 := testutil.GenerateObject()
obj2 := testutil.GenerateObject()
var addr1 oid.Address
err := addr1.DecodeString("AUSF6rhReoAdPVKYUZWW9o2LbtTvekn54B3JXi7pdzmn/2daLhLB7yVXbjBaKkckkuvjX22BxRYuSHy9RPxuH9PZS")
require.NoError(t, err)
var addr2 oid.Address
err = addr2.DecodeString("CwYYr6sFLU1zK6DeBTVd8SReADUoxYobUhSrxgXYxCVn/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM")
require.NoError(t, err)
var addr3 oid.Address
err = addr3.DecodeString("6ay4GfhR9RgN28d5ufg63toPetkYHGcpcW7G3b7QWSek/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM")
require.NoError(t, err)
obj1.SetContainerID(addr1.Container())
obj1.SetID(addr1.Object())
obj2.SetContainerID(addr2.Container())
obj2.SetID(addr2.Object())
err = putBig(db, obj1)
require.NoError(t, err)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1))
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var counter int
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetOffset(object.AddressOf(obj2))
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, garbage.Address(), addr1)
counter++
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGCPRM)
require.NoError(t, err)
// the second object would be put after the
// first, so it is expected that iteration
// will not receive the first object
require.Equal(t, 0, counter)
iterGCPRM.SetOffset(addr3)
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, garbage.Address(), addr1)
counter++
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGCPRM)
require.NoError(t, err)
// the third object would be put before the
// first, so it is expected that iteration
// will receive the first object
require.Equal(t, 1, counter)
}
func TestDB_IterateDeletedObjects(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 4 objects
obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObjectWithCID(cnr)
obj3 := testutil.GenerateObjectWithCID(cnr)
obj4 := testutil.GenerateObjectWithCID(cnr)
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
err = putBig(db, obj3)
require.NoError(t, err)
err = putBig(db, obj4)
require.NoError(t, err)
var inhumePrm meta.InhumePrm
// inhume with tombstone
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.SetTombstoneAddress(addrTombstone)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
inhumePrm.SetAddresses(object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.SetGCMark()
// inhume with GC mark
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
var (
counterAll int
buriedTS, buriedGC []oid.Address
)
var iterGravePRM meta.GraveyardIterationPrm
iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tomstoned.Tombstone())
buriedTS = append(buriedTS, tomstoned.Address())
counterAll++
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGravePRM)
require.NoError(t, err)
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
buriedGC = append(buriedGC, garbage.Address())
counterAll++
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGCPRM)
require.NoError(t, err)
// objects covered with a tombstone
// also receive GS mark
garbageExpected := []oid.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4),
}
graveyardExpected := []oid.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
}
require.Equal(t, len(garbageExpected)+len(graveyardExpected), counterAll)
require.ElementsMatch(t, graveyardExpected, buriedTS)
require.ElementsMatch(t, garbageExpected, buriedGC)
}
func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 4 objects
obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObjectWithCID(cnr)
obj3 := testutil.GenerateObjectWithCID(cnr)
obj4 := testutil.GenerateObjectWithCID(cnr)
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
err = putBig(db, obj3)
require.NoError(t, err)
err = putBig(db, obj4)
require.NoError(t, err)
// inhume with tombstone
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.SetTombstoneAddress(addrTombstone)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
expectedGraveyard := []oid.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4),
}
var (
counter int
firstIterationSize = len(expectedGraveyard) / 2
gotGraveyard []oid.Address
)
var iterGraveyardPrm meta.GraveyardIterationPrm
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
counter++
if counter == firstIterationSize {
return meta.ErrInterruptIterator
}
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm)
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGraveyard))
// last received address is an offset
offset := gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
counter++
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm)
require.NoError(t, err)
require.Equal(t, len(expectedGraveyard), counter)
require.ElementsMatch(t, gotGraveyard, expectedGraveyard)
// last received object (last in db) as offset
// should lead to no iteration at all
offset = gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
iWasCalled := false
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
iWasCalled = true
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGraveyardPrm)
require.NoError(t, err)
require.False(t, iWasCalled)
}
func TestDB_IterateOverGarbage_Offset(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
// generate and put 4 objects
obj1 := testutil.GenerateObject()
obj2 := testutil.GenerateObject()
obj3 := testutil.GenerateObject()
obj4 := testutil.GenerateObject()
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
err = putBig(db, obj3)
require.NoError(t, err)
err = putBig(db, obj4)
require.NoError(t, err)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
expectedGarbage := []oid.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4),
}
var (
counter int
firstIterationSize = len(expectedGarbage) / 2
gotGarbage []oid.Address
)
var iterGarbagePrm meta.GarbageIterationPrm
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
if counter == firstIterationSize {
return meta.ErrInterruptIterator
}
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGarbagePrm)
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGarbage))
// last received address is an offset
offset := gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGarbagePrm)
require.NoError(t, err)
require.Equal(t, len(expectedGarbage), counter)
require.ElementsMatch(t, gotGarbage, expectedGarbage)
// last received object (last in db) as offset
// should lead to no iteration at all
offset = gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
iWasCalled := false
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
iWasCalled = true
return nil
})
err = db.IterateOverGarbage(context.Background(), iterGarbagePrm)
require.NoError(t, err)
require.False(t, iWasCalled)
}
func TestDB_DropGraves(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate and put 2 objects
obj1 := testutil.GenerateObjectWithCID(cnr)
obj2 := testutil.GenerateObjectWithCID(cnr)
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
// inhume with tombstone
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.SetTombstoneAddress(addrTombstone)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
buriedTS := make([]meta.TombstonedObject, 0)
var iterGravePRM meta.GraveyardIterationPrm
var counter int
iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
buriedTS = append(buriedTS, tomstoned)
counter++
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGravePRM)
require.NoError(t, err)
require.Equal(t, 2, counter)
err = db.DropGraves(context.Background(), buriedTS)
require.NoError(t, err)
counter = 0
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
counter++
return nil
})
err = db.IterateOverGraveyard(context.Background(), iterGravePRM)
require.NoError(t, err)
require.Zero(t, counter)
}