forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
a6eb66bf9c
Drop methods to make it easier to extend. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
395 lines
12 KiB
Go
395 lines
12 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
|
dir := t.TempDir()
|
|
|
|
te := testNewEngine(t).
|
|
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
|
return []shard.Option{
|
|
shard.WithLogger(test.NewLogger(t)),
|
|
shard.WithBlobStorOptions(
|
|
blobstor.WithStorages([]blobstor.SubStorage{{
|
|
Storage: fstree.New(
|
|
fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))),
|
|
fstree.WithDepth(1)),
|
|
}})),
|
|
shard.WithMetaBaseOptions(
|
|
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
|
|
meta.WithPermissions(0o700),
|
|
meta.WithEpochState(epochState{})),
|
|
}
|
|
})
|
|
e, ids := te.engine, te.shardIDs
|
|
require.NoError(t, e.Open(context.Background()))
|
|
require.NoError(t, e.Init(context.Background()))
|
|
|
|
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
|
|
|
for _, sh := range ids {
|
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
|
objects = append(objects, obj)
|
|
|
|
var putPrm shard.PutPrm
|
|
putPrm.SetObject(obj)
|
|
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
for i := 0; ; i++ {
|
|
objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID()))
|
|
|
|
var putPrm PutPrm
|
|
putPrm.WithObject(objects[len(objects)-1])
|
|
|
|
err := e.Put(context.Background(), putPrm)
|
|
require.NoError(t, err)
|
|
|
|
res, err := e.shards[ids[len(ids)-1].String()].List(context.Background())
|
|
require.NoError(t, err)
|
|
if len(res.AddressList()) == objPerShard {
|
|
break
|
|
}
|
|
}
|
|
return e, ids, objects
|
|
}
|
|
|
|
func TestEvacuateShard(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
const objPerShard = 3
|
|
|
|
e, ids, objects := newEngineEvacuate(t, 3, objPerShard)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateShardID := ids[2].String()
|
|
|
|
checkHasObjects := func(t *testing.T) {
|
|
for i := range objects {
|
|
var prm GetPrm
|
|
prm.WithAddress(objectCore.AddressOf(objects[i]))
|
|
|
|
_, err := e.Get(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
checkHasObjects(t)
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[2:3]
|
|
|
|
t.Run("must be read-only", func(t *testing.T) {
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, ErrMustBeReadOnly)
|
|
require.Equal(t, uint64(0), res.Evacuated())
|
|
})
|
|
|
|
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(objPerShard), res.Evacuated())
|
|
|
|
// We check that all objects are available both before and after shard removal.
|
|
// First case is a real-world use-case. It ensures that an object can be put in presense
|
|
// of all metabase checks/marks.
|
|
// Second case ensures that all objects are indeed moved and available.
|
|
checkHasObjects(t)
|
|
|
|
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
|
res, err = e.Evacuate(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(0), res.Evacuated())
|
|
|
|
checkHasObjects(t)
|
|
|
|
e.mtx.Lock()
|
|
delete(e.shards, evacuateShardID)
|
|
delete(e.shardPools, evacuateShardID)
|
|
e.mtx.Unlock()
|
|
|
|
checkHasObjects(t)
|
|
}
|
|
|
|
func TestEvacuateNetwork(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
errReplication := errors.New("handler error")
|
|
|
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
|
|
var n uint64
|
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
|
if n == max {
|
|
return errReplication
|
|
}
|
|
|
|
n++
|
|
for i := range objects {
|
|
if addr == objectCore.AddressOf(objects[i]) {
|
|
require.Equal(t, objects[i], obj)
|
|
return nil
|
|
}
|
|
}
|
|
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
t.Run("single shard", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 1, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateShardID := ids[0].String()
|
|
|
|
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[0:1]
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, errMustHaveTwoShards)
|
|
require.Equal(t, uint64(0), res.Evacuated())
|
|
|
|
prm.Handler = acceptOneOf(objects, 2)
|
|
|
|
res, err = e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, errReplication)
|
|
require.Equal(t, uint64(2), res.Evacuated())
|
|
})
|
|
t.Run("multiple shards, evacuate one", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Handler = acceptOneOf(objects, 2)
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, errReplication)
|
|
require.Equal(t, uint64(2), res.Evacuated())
|
|
|
|
t.Run("no errors", func(t *testing.T) {
|
|
prm.Handler = acceptOneOf(objects, 3)
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, uint64(3), res.Evacuated())
|
|
})
|
|
})
|
|
t.Run("multiple shards, evacuate many", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 4, 5)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateIDs := ids[0:3]
|
|
|
|
var totalCount uint64
|
|
for i := range evacuateIDs {
|
|
res, err := e.shards[ids[i].String()].List(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
totalCount += uint64(len(res.AddressList()))
|
|
}
|
|
|
|
for i := range ids {
|
|
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
|
|
}
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = evacuateIDs
|
|
prm.Handler = acceptOneOf(objects, totalCount-1)
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, errReplication)
|
|
require.Equal(t, totalCount-1, res.Evacuated())
|
|
|
|
t.Run("no errors", func(t *testing.T) {
|
|
prm.Handler = acceptOneOf(objects, totalCount)
|
|
|
|
res, err := e.Evacuate(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
require.Equal(t, totalCount, res.Evacuated())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestEvacuateCancellation(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
res, err := e.Evacuate(ctx, prm)
|
|
require.ErrorContains(t, err, "context canceled")
|
|
require.Equal(t, uint64(0), res.Evacuated())
|
|
}
|
|
|
|
func TestEvacuateSingleProcess(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
|
|
|
blocker := make(chan interface{})
|
|
running := make(chan interface{})
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
|
select {
|
|
case <-running:
|
|
default:
|
|
close(running)
|
|
}
|
|
<-blocker
|
|
return nil
|
|
}
|
|
|
|
eg, egCtx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
res, err := e.Evacuate(egCtx, prm)
|
|
require.NoError(t, err, "first evacuation failed")
|
|
require.Equal(t, uint64(3), res.Evacuated())
|
|
return nil
|
|
})
|
|
eg.Go(func() error {
|
|
<-running
|
|
res, err := e.Evacuate(egCtx, prm)
|
|
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
|
|
require.Equal(t, uint64(0), res.Evacuated())
|
|
close(blocker)
|
|
return nil
|
|
})
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestEvacuateAsync(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
|
|
|
blocker := make(chan interface{})
|
|
running := make(chan interface{})
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
|
select {
|
|
case <-running:
|
|
default:
|
|
close(running)
|
|
}
|
|
<-blocker
|
|
return nil
|
|
}
|
|
|
|
st, err := e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get init state failed")
|
|
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
|
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
|
|
require.Nil(t, st.StartedAt(), "invalid init started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
eg, egCtx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
res, err := e.Evacuate(egCtx, prm)
|
|
require.NoError(t, err, "first evacuation failed")
|
|
require.Equal(t, uint64(3), res.Evacuated())
|
|
return nil
|
|
})
|
|
|
|
<-running
|
|
|
|
st, err = e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get running state failed")
|
|
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
|
|
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
|
|
require.NotNil(t, st.StartedAt(), "invalid running started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
expectedShardIDs := make([]string, 0, 2)
|
|
for _, id := range ids[1:2] {
|
|
expectedShardIDs = append(expectedShardIDs, id.String())
|
|
}
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
close(blocker)
|
|
|
|
require.Eventually(t, func() bool {
|
|
st, err = e.GetEvacuationState(context.Background())
|
|
return st.ProcessingStatus() == EvacuateProcessStateCompleted
|
|
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
|
|
|
|
require.NoError(t, err, "get final state failed")
|
|
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
|
|
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
|
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
|
|
|
require.NoError(t, eg.Wait())
|
|
}
|