frostfs-node/pkg/local_object_storage/engine/evacuate_test.go
Ekaterina Lebedeva 8a81af5a3b [] Add context parameter to Open functions
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-09-07 18:03:29 +03:00

372 lines
11 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir()
te := testNewEngine(t).
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
return []shard.Option{
shard.WithLogger(test.NewLogger(t, true)),
shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{{
Storage: fstree.New(
fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))),
fstree.WithDepth(1)),
}})),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{})),
}
})
e, ids := te.engine, te.shardIDs
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
for _, sh := range ids {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
objects = append(objects, obj)
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
for i := 0; ; i++ {
objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID()))
var putPrm PutPrm
putPrm.WithObject(objects[len(objects)-1])
err := e.Put(context.Background(), putPrm)
require.NoError(t, err)
res, err := e.shards[ids[len(ids)-1].String()].List(context.Background())
require.NoError(t, err)
if len(res.AddressList()) == objPerShard {
break
}
}
return e, ids, objects
}
func TestEvacuateShard(t *testing.T) {
t.Parallel()
const objPerShard = 3
e, ids, objects := newEngineEvacuate(t, 3, objPerShard)
evacuateShardID := ids[2].String()
checkHasObjects := func(t *testing.T) {
for i := range objects {
var prm GetPrm
prm.WithAddress(objectCore.AddressOf(objects[i]))
_, err := e.Get(context.Background(), prm)
require.NoError(t, err)
}
}
checkHasObjects(t)
var prm EvacuateShardPrm
prm.WithShardIDList(ids[2:3])
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.Evacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.Evacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
// of all metabase checks/marks.
// Second case ensures that all objects are indeed moved and available.
checkHasObjects(t)
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(0), res.Evacuated())
checkHasObjects(t)
e.mtx.Lock()
delete(e.shards, evacuateShardID)
delete(e.shardPools, evacuateShardID)
e.mtx.Unlock()
checkHasObjects(t)
}
func TestEvacuateNetwork(t *testing.T) {
t.Parallel()
var errReplication = errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
var n uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
if n == max {
return errReplication
}
n++
for i := range objects {
if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj)
return nil
}
}
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
panic("unreachable")
}
}
t.Run("single shard", func(t *testing.T) {
t.Parallel()
e, ids, objects := newEngineEvacuate(t, 1, 3)
evacuateShardID := ids[0].String()
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[0:1]
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.Evacuated())
prm.handler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
e, ids, objects := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(3), res.Evacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
t.Parallel()
e, ids, objects := newEngineEvacuate(t, 4, 5)
evacuateIDs := ids[0:3]
var totalCount uint64
for i := range evacuateIDs {
res, err := e.shards[ids[i].String()].List(context.Background())
require.NoError(t, err)
totalCount += uint64(len(res.AddressList()))
}
for i := range ids {
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
}
var prm EvacuateShardPrm
prm.shardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Evacuated())
})
})
}
func TestEvacuateCancellation(t *testing.T) {
t.Parallel()
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.Evacuated())
}
func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
close(running)
}
<-blocker
return nil
}
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated())
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
}
func TestEvacuateAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
close(running)
}
<-blocker
return nil
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
<-running
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2)
for _, id := range ids[1:2] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
close(blocker)
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
require.NoError(t, eg.Wait())
}