frostfs-node/pkg/local_object_storage/engine/inhume_test.go
Aleksey Savchuk 3f4df881c9
[#1403] engine: Add tests for object deletion on read-only shard
Currently, when an object on some shard is inhumed, the engine places
a tombstone on the same shard. If the target shard is read-only, the
engine fails.

In that case, we want the engine to correctly place a tombstone on
a different shard, ensuring that:
- An object becomes unavailable if a tombstone was placed on a different
  shard. See `TestObjectUnavailableIfTombstoneOnDifferentShard` test.
- GC deletes an object if a tombstone was placed on a different shard.
  See `TestObjectDeletedIfTombstoneOnDifferentShard` test.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-07 16:16:24 +03:00

203 lines
5.2 KiB
Go

package engine
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStorageEngine_Inhume(t *testing.T) {
cnr := cidtest.ID()
splitID := objectSDK.NewSplitID()
fs := objectSDK.SearchFilters{}
fs.AddRootFilter()
tombstoneID := object.AddressOf(testutil.GenerateObjectWithCID(cnr))
parent := testutil.GenerateObjectWithCID(cnr)
child := testutil.GenerateObjectWithCID(cnr)
child.SetParent(parent)
idParent, _ := parent.ID()
child.SetParentID(idParent)
child.SetSplitID(splitID)
link := testutil.GenerateObjectWithCID(cnr)
link.SetParent(parent)
link.SetParentID(idParent)
idChild, _ := child.ID()
link.SetChildren(idChild)
link.SetSplitID(splitID)
t.Run("delete small object", func(t *testing.T) {
t.Parallel()
e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close(context.Background())
err := Put(context.Background(), e, parent)
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, fs)
require.NoError(t, err)
require.Empty(t, addrs)
})
t.Run("delete big object", func(t *testing.T) {
t.Parallel()
s1 := testNewShard(t)
s2 := testNewShard(t)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close(context.Background())
var putChild shard.PutPrm
putChild.SetObject(child)
_, err := s1.Put(context.Background(), putChild)
require.NoError(t, err)
var putLink shard.PutPrm
putLink.SetObject(link)
_, err = s2.Put(context.Background(), putLink)
require.NoError(t, err)
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, fs)
require.NoError(t, err)
require.Empty(t, addrs)
})
}
func TestObjectUnavailableIfTombstoneOnDifferentShard(t *testing.T) {
t.Run("look first at the shard with the object", func(t *testing.T) {
obj := objecttest.Object().Parent()
shard1 := testNewShard(t, shard.WithDisabledGC())
shard2 := testNewShard(t, shard.WithDisabledGC())
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
shards := engine.sortShards(object.AddressOf(obj))
shard1 = shards[0].Shard
shard2 = shards[1].Shard
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
})
t.Run("look first at the shard with the tombstone", func(t *testing.T) {
obj := objecttest.Object().Parent()
shard1 := testNewShard(t, shard.WithDisabledGC())
shard2 := testNewShard(t, shard.WithDisabledGC())
engine := testNewEngine(t).setInitializedShards(t, shard1, shard2).engine
shards := engine.sortShards(object.AddressOf(obj))
shard1 = shards[1].Shard
shard2 = shards[0].Shard
testObjectUnavailableIfTombstoneOnDifferentShard(t, obj, shard1, shard2, engine)
})
}
func testObjectUnavailableIfTombstoneOnDifferentShard(
t *testing.T,
obj *objectSDK.Object,
shard1, shard2 *shard.Shard,
engine *StorageEngine,
) {
ctx := context.Background()
ts := oidtest.Address()
cnr, set := obj.ContainerID()
require.True(t, set)
ts.SetContainer(cnr)
{
prm := shard.PutPrm{}
prm.SetObject(obj)
_, err := shard1.Put(ctx, prm)
require.NoError(t, err)
}
{
prm := shard.InhumePrm{}
prm.SetTarget(ts, object.AddressOf(obj))
_, err := shard2.Inhume(ctx, prm)
require.NoError(t, err)
}
{
prm := GetPrm{}
prm.WithAddress(object.AddressOf(obj))
_, err := engine.Get(ctx, prm)
assert.Error(t, err)
assert.True(t, client.IsErrObjectAlreadyRemoved(err))
}
}
func TestObjectDeletedIfTombstoneOnDifferentShard(t *testing.T) {
ctx := context.Background()
shard1 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
shard2 := testNewShard(t, shard.WithGCRemoverSleepInterval(200*time.Millisecond))
obj := objecttest.Object().Parent()
ts := oidtest.Address()
cnr, set := obj.ContainerID()
require.True(t, set)
ts.SetContainer(cnr)
{
prm := shard.PutPrm{}
prm.SetObject(obj)
_, err := shard1.Put(ctx, prm)
require.NoError(t, err)
}
{
prm := shard.InhumePrm{}
prm.SetTarget(ts, object.AddressOf(obj))
_, err := shard2.Inhume(ctx, prm)
require.NoError(t, err)
}
<-time.After(1 * time.Second)
{
prm := shard.ExistsPrm{
Address: object.AddressOf(obj),
}
res, err := shard1.Exists(ctx, prm)
assert.NoError(t, err)
assert.False(t, res.Exists())
_, err = shard2.Exists(ctx, prm)
require.True(t, client.IsErrObjectAlreadyRemoved(err))
}
}