forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
76268e3ea2
Target container ID is taken from tombstone: cmd/frostfs-node/object.go:507 Also object of type `TOMBSTONE` contains objectID, so tombstone and tombstoned object must have the same containerID. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
136 lines
3.9 KiB
Go
136 lines
3.9 KiB
Go
package meta_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestDB_Inhume(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
raw := testutil.GenerateObject()
|
|
testutil.AddAttribute(raw, "foo", "bar")
|
|
|
|
err := putBig(db, raw)
|
|
require.NoError(t, err)
|
|
|
|
err = metaInhume(db, object.AddressOf(raw), oidtest.ID())
|
|
require.NoError(t, err)
|
|
|
|
_, err = metaExists(db, object.AddressOf(raw))
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
|
|
_, err = metaGet(db, object.AddressOf(raw), false)
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
}
|
|
|
|
func TestInhumeTombOnTomb(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
var (
|
|
err error
|
|
|
|
cnr = cidtest.ID()
|
|
addr1 = oidtest.Address()
|
|
addr2 = oidtest.Address()
|
|
addr3 = oidtest.Address()
|
|
addr4 = oidtest.Address()
|
|
inhumePrm meta.InhumePrm
|
|
existsPrm meta.ExistsPrm
|
|
)
|
|
|
|
addr1.SetContainer(cnr)
|
|
addr2.SetContainer(cnr)
|
|
addr3.SetContainer(cnr)
|
|
addr4.SetContainer(cnr)
|
|
|
|
inhumePrm.SetAddresses(addr1)
|
|
inhumePrm.SetTombstoneAddress(addr2)
|
|
|
|
// inhume addr1 via addr2
|
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
require.NoError(t, err)
|
|
|
|
existsPrm.SetAddress(addr1)
|
|
|
|
// addr1 should become inhumed {addr1:addr2}
|
|
_, err = db.Exists(context.Background(), existsPrm)
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
|
|
inhumePrm.SetAddresses(addr3)
|
|
inhumePrm.SetTombstoneAddress(addr1)
|
|
|
|
// try to inhume addr3 via addr1
|
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
require.NoError(t, err)
|
|
|
|
// record with {addr1:addr2} should be removed from graveyard
|
|
// as a tomb-on-tomb; metabase should return ObjectNotFound
|
|
// NOT ObjectAlreadyRemoved since that record has been removed
|
|
// from graveyard but addr1 is still marked with GC
|
|
_, err = db.Exists(context.Background(), existsPrm)
|
|
require.True(t, client.IsErrObjectNotFound(err))
|
|
|
|
existsPrm.SetAddress(addr3)
|
|
|
|
// addr3 should be inhumed {addr3: addr1}
|
|
_, err = db.Exists(context.Background(), existsPrm)
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
|
|
inhumePrm.SetAddresses(addr1)
|
|
inhumePrm.SetTombstoneAddress(addr4)
|
|
|
|
// try to inhume addr1 (which is already a tombstone in graveyard)
|
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
require.NoError(t, err)
|
|
|
|
existsPrm.SetAddress(addr1)
|
|
|
|
// record with addr1 key should not appear in graveyard
|
|
// (tomb can not be inhumed) but should be kept as object
|
|
// with GC mark
|
|
_, err = db.Exists(context.Background(), existsPrm)
|
|
require.True(t, client.IsErrObjectNotFound(err))
|
|
}
|
|
|
|
func TestInhumeLocked(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
locked := oidtest.Address()
|
|
|
|
err := db.Lock(context.Background(), locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
|
|
require.NoError(t, err)
|
|
|
|
var prm meta.InhumePrm
|
|
prm.SetAddresses(locked)
|
|
|
|
_, err = db.Inhume(context.Background(), prm)
|
|
|
|
var e *apistatus.ObjectLocked
|
|
require.ErrorAs(t, err, &e)
|
|
}
|
|
|
|
func metaInhume(db *meta.DB, target oid.Address, tomb oid.ID) error {
|
|
var inhumePrm meta.InhumePrm
|
|
inhumePrm.SetAddresses(target)
|
|
var tombAddr oid.Address
|
|
tombAddr.SetContainer(target.Container())
|
|
tombAddr.SetObject(tomb)
|
|
inhumePrm.SetTombstoneAddress(tombAddr)
|
|
|
|
_, err := db.Inhume(context.Background(), inhumePrm)
|
|
return err
|
|
}
|