Dmitrii Stepanov
76268e3ea2
Target container ID is taken from tombstone: cmd/frostfs-node/object.go:507 Also object of type `TOMBSTONE` contains objectID, so tombstone and tombstoned object must have the same containerID. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
208 lines
5.8 KiB
Go
208 lines
5.8 KiB
Go
package meta_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestDB_Delete(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
cnr := cidtest.ID()
|
|
parent := testutil.GenerateObjectWithCID(cnr)
|
|
testutil.AddAttribute(parent, "foo", "bar")
|
|
|
|
child := testutil.GenerateObjectWithCID(cnr)
|
|
child.SetParent(parent)
|
|
idParent, _ := parent.ID()
|
|
child.SetParentID(idParent)
|
|
|
|
// put object with parent
|
|
err := putBig(db, child)
|
|
require.NoError(t, err)
|
|
|
|
// try to remove parent, should be no-op, error-free
|
|
err = metaDelete(db, object.AddressOf(parent))
|
|
require.NoError(t, err)
|
|
|
|
// inhume parent and child so they will be on graveyard
|
|
ts := testutil.GenerateObjectWithCID(cnr)
|
|
|
|
err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts).Object())
|
|
require.NoError(t, err)
|
|
|
|
ts = testutil.GenerateObjectWithCID(cnr)
|
|
|
|
err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts).Object())
|
|
require.NoError(t, err)
|
|
|
|
// delete object
|
|
err = metaDelete(db, object.AddressOf(child))
|
|
require.NoError(t, err)
|
|
|
|
// check if they marked as already removed
|
|
|
|
ok, err := metaExists(db, object.AddressOf(child))
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
require.False(t, ok)
|
|
|
|
ok, err = metaExists(db, object.AddressOf(parent))
|
|
require.True(t, client.IsErrObjectAlreadyRemoved(err))
|
|
require.False(t, ok)
|
|
}
|
|
|
|
func TestDeleteAllChildren(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
cnr := cidtest.ID()
|
|
|
|
// generate parent object
|
|
parent := testutil.GenerateObjectWithCID(cnr)
|
|
|
|
// generate 2 children
|
|
child1 := testutil.GenerateObjectWithCID(cnr)
|
|
child1.SetParent(parent)
|
|
idParent, _ := parent.ID()
|
|
child1.SetParentID(idParent)
|
|
|
|
child2 := testutil.GenerateObjectWithCID(cnr)
|
|
child2.SetParent(parent)
|
|
child2.SetParentID(idParent)
|
|
|
|
// put children
|
|
require.NoError(t, putBig(db, child1))
|
|
require.NoError(t, putBig(db, child2))
|
|
|
|
// Exists should return split info for parent
|
|
_, err := metaExists(db, object.AddressOf(parent))
|
|
siErr := objectSDK.NewSplitInfoError(nil)
|
|
require.True(t, errors.As(err, &siErr))
|
|
|
|
// remove all children in single call
|
|
err = metaDelete(db, object.AddressOf(child1), object.AddressOf(child2))
|
|
require.NoError(t, err)
|
|
|
|
// parent should not be found now
|
|
ex, err := metaExists(db, object.AddressOf(parent))
|
|
require.NoError(t, err)
|
|
require.False(t, ex)
|
|
}
|
|
|
|
func TestGraveOnlyDelete(t *testing.T) {
|
|
db := newDB(t)
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
addr := oidtest.Address()
|
|
|
|
// inhume non-existent object by address
|
|
require.NoError(t, metaInhume(db, addr, oidtest.ID()))
|
|
|
|
// delete the object data
|
|
require.NoError(t, metaDelete(db, addr))
|
|
}
|
|
|
|
func TestExpiredObject(t *testing.T) {
|
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
|
|
// removing expired object should be error-free
|
|
require.NoError(t, metaDelete(db, object.AddressOf(exp)))
|
|
|
|
require.NoError(t, metaDelete(db, object.AddressOf(nonExp)))
|
|
})
|
|
}
|
|
|
|
func TestDelete(t *testing.T) {
|
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
cnr := cidtest.ID()
|
|
for range 10 {
|
|
obj := testutil.GenerateObjectWithCID(cnr)
|
|
|
|
var prm meta.PutPrm
|
|
prm.SetObject(obj)
|
|
prm.SetStorageID([]byte("0/0"))
|
|
_, err := db.Put(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
var inhumePrm meta.InhumePrm
|
|
inhumePrm.SetAddresses(object.AddressOf(obj))
|
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var addrs []oid.Address
|
|
var iprm meta.GarbageIterationPrm
|
|
iprm.SetHandler(func(o meta.GarbageObject) error {
|
|
addrs = append(addrs, o.Address())
|
|
return nil
|
|
})
|
|
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
|
|
require.Equal(t, 10, len(addrs))
|
|
var deletePrm meta.DeletePrm
|
|
deletePrm.SetAddresses(addrs...)
|
|
_, err := db.Delete(context.Background(), deletePrm)
|
|
require.NoError(t, err)
|
|
|
|
addrs = nil
|
|
iprm.SetHandler(func(o meta.GarbageObject) error {
|
|
addrs = append(addrs, o.Address())
|
|
return nil
|
|
})
|
|
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
|
|
require.Equal(t, 0, len(addrs))
|
|
}
|
|
|
|
func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
|
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
|
defer func() { require.NoError(t, db.Close()) }()
|
|
|
|
addr := oidtest.Address()
|
|
|
|
var prm meta.InhumePrm
|
|
prm.SetAddresses(addr)
|
|
prm.SetGCMark()
|
|
_, err := db.Inhume(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
|
|
var garbageCount int
|
|
var itPrm meta.GarbageIterationPrm
|
|
itPrm.SetHandler(func(g meta.GarbageObject) error {
|
|
garbageCount++
|
|
return nil
|
|
})
|
|
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
|
require.Equal(t, 1, garbageCount)
|
|
|
|
var delPrm meta.DeletePrm
|
|
delPrm.SetAddresses(addr)
|
|
_, err = db.Delete(context.Background(), delPrm)
|
|
require.NoError(t, err)
|
|
|
|
garbageCount = 0
|
|
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
|
|
require.Equal(t, 0, garbageCount)
|
|
}
|
|
|
|
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
|
|
var deletePrm meta.DeletePrm
|
|
deletePrm.SetAddresses(addrs...)
|
|
|
|
_, err := db.Delete(context.Background(), deletePrm)
|
|
return err
|
|
}
|