frostfs-node/pkg/local_object_storage/engine/delete_test.go
Aleksey Savchuk 7fc6101bec
[#1491] engine/test: Rework engine test utils
- Remove `testNewShard` and `setInitializedShards` because they
violated the default engine workflow. The correct workflow is:
first use `New()`, followed by `Open()`, and then `Init()`. As a
result, adding new logic to `(*StorageEngine).Init` caused several
tests to fail with a panic when attempting to access uninitialized
resources. Now, all engines created with the test utils must be
initialized manually. The new helper method `prepare` can be used
for that purpose.
- Additionally, `setInitializedShards` hardcoded the shard worker
pool size, which prevented it from being configured in tests and
benchmarks. This has been fixed as well.
- Ensure engine initialization is done wherever it was missing.
- Refactor `setShardsNumOpts`, `setShardsNumAdditionalOpts`, and
`setShardsNum`. Make them all depend on `setShardsNumOpts`.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-13 14:42:53 +03:00

187 lines
5.4 KiB
Go

package engine
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestDeleteBigObject(t *testing.T) {
t.Parallel()
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
const childCount = 10
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
e := testNewEngine(t).setShardsNum(t, 3).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
for i := range children {
require.NoError(t, Put(context.Background(), e, children[i], false))
}
require.NoError(t, Put(context.Background(), e, link, false))
addrParent := object.AddressOf(parent)
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
addrLink := object.AddressOf(link)
checkGetError[error](t, e, addrLink, false)
for i := range children {
checkGetError[error](t, e, object.AddressOf(children[i]), false)
}
var deletePrm DeletePrm
deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm)
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
for i := range children {
checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true)
}
}
func TestDeleteBigObjectWithoutGC(t *testing.T) {
t.Parallel()
cnr := cidtest.ID()
parentID := oidtest.ID()
splitID := objectSDK.NewSplitID()
parent := testutil.GenerateObjectWithCID(cnr)
parent.SetID(parentID)
parent.SetPayload(nil)
const childCount = 3
children := make([]*objectSDK.Object, childCount)
childIDs := make([]oid.ID, childCount)
for i := range children {
children[i] = testutil.GenerateObjectWithCID(cnr)
if i != 0 {
children[i].SetPreviousID(childIDs[i-1])
}
if i == len(children)-1 {
children[i].SetParent(parent)
}
children[i].SetSplitID(splitID)
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
childIDs[i], _ = children[i].ID()
}
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(parentID)
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
te := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{shard.WithDisabledGC()}
}).prepare(t)
e := te.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
s1 := te.shards[0]
for i := range children {
require.NoError(t, Put(context.Background(), e, children[i], false))
}
require.NoError(t, Put(context.Background(), e, link, false))
addrParent := object.AddressOf(parent)
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
addrLink := object.AddressOf(link)
checkGetError[error](t, e, addrLink, false)
for i := range children {
checkGetError[error](t, e, object.AddressOf(children[i]), false)
}
// delete logical
var deletePrm DeletePrm
deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm)
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
for i := range children {
checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true)
}
// delete physical
var delPrm shard.DeletePrm
delPrm.SetAddresses(addrParent)
_, err = s1.Delete(context.Background(), delPrm)
require.NoError(t, err)
delPrm.SetAddresses(addrLink)
_, err = s1.Delete(context.Background(), delPrm)
require.NoError(t, err)
for i := range children {
delPrm.SetAddresses(object.AddressOf(children[i]))
_, err = s1.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
for i := range children {
checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true)
}
}
func checkGetError[E error](t *testing.T, e *StorageEngine, addr oid.Address, shouldFail bool) {
var getPrm GetPrm
getPrm.WithAddress(addr)
_, err := e.Get(context.Background(), getPrm)
if shouldFail {
var target E
require.ErrorAs(t, err, &target)
} else {
require.NoError(t, err)
}
}