frostfs-node/pkg/local_object_storage/metabase/delete_test.go

209 lines
5.8 KiB
Go
Raw Normal View History

package meta_test
import (
"context"
"errors"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestDB_Delete(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
parent := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(parent, "foo", "bar")
child := testutil.GenerateObjectWithCID(cnr)
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
// put object with parent
err := putBig(db, child)
require.NoError(t, err)
// try to remove parent, should be no-op, error-free
err = metaDelete(db, object.AddressOf(parent))
require.NoError(t, err)
// inhume parent and child so they will be on graveyard
ts := testutil.GenerateObjectWithCID(cnr)
err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts))
require.NoError(t, err)
ts = testutil.GenerateObjectWithCID(cnr)
err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts))
require.NoError(t, err)
// delete object
err = metaDelete(db, object.AddressOf(child))
require.NoError(t, err)
// check if they marked as already removed
ok, err := metaExists(db, object.AddressOf(child))
require.True(t, client.IsErrObjectAlreadyRemoved(err))
require.False(t, ok)
ok, err = metaExists(db, object.AddressOf(parent))
require.True(t, client.IsErrObjectAlreadyRemoved(err))
require.False(t, ok)
}
func TestDeleteAllChildren(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// generate parent object
parent := testutil.GenerateObjectWithCID(cnr)
// generate 2 children
child1 := testutil.GenerateObjectWithCID(cnr)
child1.SetParent(parent)
idParent, _ := parent.ID()
child1.SetParentID(idParent)
child2 := testutil.GenerateObjectWithCID(cnr)
child2.SetParent(parent)
child2.SetParentID(idParent)
// put children
require.NoError(t, putBig(db, child1))
require.NoError(t, putBig(db, child2))
// Exists should return split info for parent
_, err := metaExists(db, object.AddressOf(parent))
siErr := objectSDK.NewSplitInfoError(nil)
require.True(t, errors.As(err, &siErr))
// remove all children in single call
err = metaDelete(db, object.AddressOf(child1), object.AddressOf(child2))
require.NoError(t, err)
// parent should not be found now
ex, err := metaExists(db, object.AddressOf(parent))
require.NoError(t, err)
require.False(t, ex)
}
func TestGraveOnlyDelete(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
addr := oidtest.Address()
// inhume non-existent object by address
require.NoError(t, metaInhume(db, addr, oidtest.Address()))
// delete the object data
require.NoError(t, metaDelete(db, addr))
}
func TestExpiredObject(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close()) }()
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
// removing expired object should be error-free
require.NoError(t, metaDelete(db, object.AddressOf(exp)))
require.NoError(t, metaDelete(db, object.AddressOf(nonExp)))
})
}
func TestDelete(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
for range 10 {
obj := testutil.GenerateObjectWithCID(cnr)
var prm meta.PutPrm
prm.SetObject(obj)
prm.SetStorageID([]byte("0/0"))
_, err := db.Put(context.Background(), prm)
require.NoError(t, err)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj))
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
}
var addrs []oid.Address
var iprm meta.GarbageIterationPrm
iprm.SetHandler(func(o meta.GarbageObject) error {
addrs = append(addrs, o.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
require.Equal(t, 10, len(addrs))
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(addrs...)
_, err := db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
addrs = nil
iprm.SetHandler(func(o meta.GarbageObject) error {
addrs = append(addrs, o.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
require.Equal(t, 0, len(addrs))
}
func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close()) }()
addr := oidtest.Address()
var prm meta.InhumePrm
prm.SetAddresses(addr)
prm.SetGCMark()
_, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
var garbageCount int
var itPrm meta.GarbageIterationPrm
itPrm.SetHandler(func(g meta.GarbageObject) error {
garbageCount++
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 1, garbageCount)
var delPrm meta.DeletePrm
delPrm.SetAddresses(addr)
_, err = db.Delete(context.Background(), delPrm)
require.NoError(t, err)
garbageCount = 0
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 0, garbageCount)
}
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(addrs...)
_, err := db.Delete(context.Background(), deletePrm)
return err
}