Dmitrii Stepanov
41da27dad5
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 4m4s
DCO action / DCO (pull_request) Successful in 5m4s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m5s
Vulncheck / Vulncheck (pull_request) Successful in 4m56s
Tests and linters / Staticcheck (pull_request) Successful in 5m10s
Build / Build Components (pull_request) Successful in 5m32s
Tests and linters / Tests with -race (pull_request) Successful in 6m24s
Tests and linters / Lint (pull_request) Successful in 6m36s
Tests and linters / Tests (pull_request) Successful in 6m46s
Tests and linters / gopls check (pull_request) Successful in 7m9s
Tests and linters / Run gofumpt (push) Successful in 1m34s
Tests and linters / Staticcheck (push) Successful in 3m22s
Vulncheck / Vulncheck (push) Successful in 3m43s
Tests and linters / Lint (push) Successful in 4m16s
Build / Build Components (push) Successful in 5m0s
Pre-commit hooks / Pre-commit (push) Successful in 5m3s
Tests and linters / Tests (push) Successful in 5m9s
Tests and linters / Tests with -race (push) Successful in 5m13s
Tests and linters / gopls check (push) Successful in 5m54s
Now it is only async evacuation. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
820 lines
26 KiB
Go
820 lines
26 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type containerStorage struct {
|
|
cntmap map[cid.ID]*container.Container
|
|
latency time.Duration
|
|
}
|
|
|
|
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
|
|
time.Sleep(cs.latency)
|
|
v, ok := cs.cntmap[id]
|
|
if !ok {
|
|
return nil, new(apistatus.ContainerNotFound)
|
|
}
|
|
coreCnt := coreContainer.Container{
|
|
Value: *v,
|
|
}
|
|
return &coreCnt, nil
|
|
}
|
|
|
|
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
|
dir := t.TempDir()
|
|
|
|
te := testNewEngine(t).
|
|
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
|
return []shard.Option{
|
|
shard.WithLogger(test.NewLogger(t)),
|
|
shard.WithBlobStorOptions(
|
|
blobstor.WithStorages([]blobstor.SubStorage{{
|
|
Storage: fstree.New(
|
|
fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))),
|
|
fstree.WithDepth(1)),
|
|
}})),
|
|
shard.WithMetaBaseOptions(
|
|
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
|
|
meta.WithPermissions(0o700),
|
|
meta.WithEpochState(epochState{})),
|
|
shard.WithPiloramaOptions(
|
|
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
|
|
pilorama.WithPerm(0o700),
|
|
),
|
|
}
|
|
}).
|
|
prepare(t)
|
|
e, ids := te.engine, te.shardIDs
|
|
|
|
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
|
treeID := "version"
|
|
meta := []pilorama.KeyValue{
|
|
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
|
|
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
|
|
}
|
|
cnrMap := make(map[cid.ID]*container.Container)
|
|
for _, sh := range ids {
|
|
for i := range objPerShard {
|
|
// Create dummy container
|
|
cnr1 := container.Container{}
|
|
cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i))
|
|
contID := cidtest.ID()
|
|
cnrMap[contID] = &cnr1
|
|
|
|
obj := testutil.GenerateObjectWithCID(contID)
|
|
objects = append(objects, obj)
|
|
|
|
var putPrm shard.PutPrm
|
|
putPrm.SetObject(obj)
|
|
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
|
|
require.NoError(t, err)
|
|
|
|
_, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1},
|
|
treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
e.SetContainerSource(&containerStorage{cntmap: cnrMap})
|
|
return e, ids, objects
|
|
}
|
|
|
|
func TestEvacuateShardObjects(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
const objPerShard = 3
|
|
|
|
e, ids, objects := newEngineEvacuate(t, 3, objPerShard)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateShardID := ids[2].String()
|
|
|
|
checkHasObjects := func(t *testing.T) {
|
|
for i := range objects {
|
|
var prm GetPrm
|
|
prm.WithAddress(objectCore.AddressOf(objects[i]))
|
|
|
|
_, err := e.Get(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
checkHasObjects(t)
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[2:3]
|
|
prm.Scope = EvacuateScopeObjects
|
|
|
|
t.Run("must be read-only", func(t *testing.T) {
|
|
err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, ErrMustBeReadOnly)
|
|
})
|
|
|
|
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
err := e.Evacuate(context.Background(), prm)
|
|
require.NoError(t, err)
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, st.ErrorMessage(), "")
|
|
require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated())
|
|
|
|
// We check that all objects are available both before and after shard removal.
|
|
// First case is a real-world use-case. It ensures that an object can be put in presense
|
|
// of all metabase checks/marks.
|
|
// Second case ensures that all objects are indeed moved and available.
|
|
checkHasObjects(t)
|
|
|
|
// Objects on evacuated shards should be logically unavailable, but persisted on disk.
|
|
// This is necessary to prevent removing it by policer in case of `REP 1` policy.
|
|
for _, obj := range objects[len(objects)-objPerShard:] {
|
|
var prmGet shard.GetPrm
|
|
prmGet.SetAddress(objectCore.AddressOf(obj))
|
|
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
|
require.Error(t, err)
|
|
|
|
prmGet.SkipEvacCheck(true)
|
|
_, err = e.shards[evacuateShardID].Get(context.Background(), prmGet)
|
|
require.NoError(t, err)
|
|
|
|
var prmHead shard.HeadPrm
|
|
prmHead.SetAddress(objectCore.AddressOf(obj))
|
|
_, err = e.shards[evacuateShardID].Head(context.Background(), prmHead)
|
|
require.Error(t, err)
|
|
|
|
var existsPrm shard.ExistsPrm
|
|
existsPrm.Address = objectCore.AddressOf(obj)
|
|
_, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm)
|
|
require.Error(t, err)
|
|
|
|
var rngPrm shard.RngPrm
|
|
rngPrm.SetAddress(objectCore.AddressOf(obj))
|
|
_, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm)
|
|
require.Error(t, err)
|
|
}
|
|
|
|
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st = testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, st.ErrorMessage(), "")
|
|
require.Equal(t, uint64(0), st.ObjectsEvacuated())
|
|
|
|
checkHasObjects(t)
|
|
|
|
e.mtx.Lock()
|
|
delete(e.shards, evacuateShardID)
|
|
delete(e.shardPools, evacuateShardID)
|
|
e.mtx.Unlock()
|
|
|
|
checkHasObjects(t)
|
|
}
|
|
|
|
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
|
|
var st *EvacuationState
|
|
var err error
|
|
require.Eventually(t, func() bool {
|
|
st, err = e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err)
|
|
return st.ProcessingStatus() == EvacuateProcessStateCompleted
|
|
}, 3*time.Second, 10*time.Millisecond)
|
|
return st
|
|
}
|
|
|
|
func TestEvacuateObjectsNetwork(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
errReplication := errors.New("handler error")
|
|
|
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
|
var n atomic.Uint64
|
|
var mtx sync.Mutex
|
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
|
mtx.Lock()
|
|
defer mtx.Unlock()
|
|
if n.Load() == max {
|
|
return false, errReplication
|
|
}
|
|
|
|
n.Add(1)
|
|
for i := range objects {
|
|
if addr == objectCore.AddressOf(objects[i]) {
|
|
require.Equal(t, objects[i], obj)
|
|
return true, nil
|
|
}
|
|
}
|
|
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
t.Run("single shard", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 1, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateShardID := ids[0].String()
|
|
|
|
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[0:1]
|
|
prm.Scope = EvacuateScopeObjects
|
|
|
|
err := e.Evacuate(context.Background(), prm)
|
|
require.ErrorIs(t, err, errMustHaveTwoShards)
|
|
|
|
prm.ObjectsHandler = acceptOneOf(objects, 2)
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Contains(t, st.ErrorMessage(), errReplication.Error())
|
|
require.Equal(t, uint64(2), st.ObjectsEvacuated())
|
|
})
|
|
t.Run("multiple shards, evacuate one", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.ObjectsHandler = acceptOneOf(objects, 2)
|
|
prm.Scope = EvacuateScopeObjects
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Contains(t, st.ErrorMessage(), errReplication.Error())
|
|
require.Equal(t, uint64(2), st.ObjectsEvacuated())
|
|
|
|
t.Run("no errors", func(t *testing.T) {
|
|
prm.ObjectsHandler = acceptOneOf(objects, 3)
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, st.ErrorMessage(), "")
|
|
require.Equal(t, uint64(3), st.ObjectsEvacuated())
|
|
})
|
|
})
|
|
t.Run("multiple shards, evacuate many", func(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, objects := newEngineEvacuate(t, 4, 5)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
evacuateIDs := ids[0:3]
|
|
|
|
var totalCount uint64
|
|
for i := range evacuateIDs {
|
|
res, err := e.shards[ids[i].String()].List(context.Background())
|
|
require.NoError(t, err)
|
|
|
|
totalCount += uint64(len(res.AddressList()))
|
|
}
|
|
|
|
for i := range ids {
|
|
require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
}
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = evacuateIDs
|
|
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
|
|
prm.Scope = EvacuateScopeObjects
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Contains(t, st.ErrorMessage(), errReplication.Error())
|
|
require.Equal(t, totalCount-1, st.ObjectsEvacuated())
|
|
|
|
t.Run("no errors", func(t *testing.T) {
|
|
prm.ObjectsHandler = acceptOneOf(objects, totalCount)
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, st.ErrorMessage(), "")
|
|
require.Equal(t, totalCount, st.ObjectsEvacuated())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestEvacuateCancellation(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
default:
|
|
}
|
|
return true, nil
|
|
}
|
|
prm.Scope = EvacuateScopeObjects
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
err := e.Evacuate(ctx, prm)
|
|
require.ErrorContains(t, err, "context canceled")
|
|
}
|
|
|
|
func TestEvacuateCancellationByError(t *testing.T) {
|
|
t.Parallel()
|
|
e, ids, _ := newEngineEvacuate(t, 2, 10)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
var once atomic.Bool
|
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
|
var err error
|
|
flag := true
|
|
if once.CompareAndSwap(false, true) {
|
|
err = errors.New("test error")
|
|
flag = false
|
|
}
|
|
return flag, err
|
|
}
|
|
prm.Scope = EvacuateScopeObjects
|
|
prm.ObjectWorkerCount = 2
|
|
prm.ContainerWorkerCount = 2
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Contains(t, st.ErrorMessage(), "test error")
|
|
}
|
|
|
|
func TestEvacuateSingleProcess(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
blocker := make(chan interface{})
|
|
running := make(chan interface{})
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Scope = EvacuateScopeObjects
|
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
|
select {
|
|
case <-running:
|
|
default:
|
|
close(running)
|
|
}
|
|
<-blocker
|
|
return true, nil
|
|
}
|
|
|
|
eg, egCtx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
|
|
return nil
|
|
})
|
|
eg.Go(func() error {
|
|
<-running
|
|
require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed")
|
|
close(blocker)
|
|
return nil
|
|
})
|
|
require.NoError(t, eg.Wait())
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, uint64(3), st.ObjectsEvacuated())
|
|
require.Equal(t, st.ErrorMessage(), "")
|
|
}
|
|
|
|
func TestEvacuateObjectsAsync(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
blocker := make(chan interface{})
|
|
running := make(chan interface{})
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[1:2]
|
|
prm.Scope = EvacuateScopeObjects
|
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
|
select {
|
|
case <-running:
|
|
default:
|
|
close(running)
|
|
}
|
|
<-blocker
|
|
return true, nil
|
|
}
|
|
|
|
st, err := e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get init state failed")
|
|
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
|
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count")
|
|
require.Nil(t, st.StartedAt(), "invalid init started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
eg, egCtx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
|
|
st = testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
|
|
return nil
|
|
})
|
|
|
|
<-running
|
|
|
|
st, err = e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get running state failed")
|
|
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
|
|
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count")
|
|
require.NotNil(t, st.StartedAt(), "invalid running started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
expectedShardIDs := make([]string, 0, 2)
|
|
for _, id := range ids[1:2] {
|
|
expectedShardIDs = append(expectedShardIDs, id.String())
|
|
}
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
require.Error(t, e.ResetEvacuationStatus(context.Background()))
|
|
|
|
close(blocker)
|
|
|
|
st = testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
|
|
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
|
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
|
|
|
require.NoError(t, eg.Wait())
|
|
|
|
require.NoError(t, e.ResetEvacuationStatus(context.Background()))
|
|
st, err = e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get state after reset failed")
|
|
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid state after reset")
|
|
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid count after reset")
|
|
require.Nil(t, st.StartedAt(), "invalid started at after reset")
|
|
require.Nil(t, st.FinishedAt(), "invalid finished at after reset")
|
|
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid shard ids after reset")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid error message after reset")
|
|
}
|
|
|
|
func TestEvacuateTreesLocal(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[0:1]
|
|
prm.Scope = EvacuateScopeTrees
|
|
|
|
expectedShardIDs := make([]string, 0, 1)
|
|
for _, id := range ids[0:1] {
|
|
expectedShardIDs = append(expectedShardIDs, id.String())
|
|
}
|
|
|
|
st, err := e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get init state failed")
|
|
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
|
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
|
|
require.Nil(t, st.StartedAt(), "invalid init started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
|
|
|
|
st = testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
|
|
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
|
|
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
|
|
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
|
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
|
|
|
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()])
|
|
require.NoError(t, err, "list source trees failed")
|
|
require.Len(t, sourceTrees, 3)
|
|
|
|
for _, tr := range sourceTrees {
|
|
exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID)
|
|
require.NoError(t, err, "failed to check tree existance")
|
|
require.True(t, exists, "tree doesn't exists on target shard")
|
|
|
|
var height uint64
|
|
var sourceOps []pilorama.Move
|
|
for {
|
|
op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
|
|
require.NoError(t, err)
|
|
if op.Time == 0 {
|
|
break
|
|
}
|
|
sourceOps = append(sourceOps, op)
|
|
height = op.Time + 1
|
|
}
|
|
|
|
height = 0
|
|
var targetOps []pilorama.Move
|
|
for {
|
|
op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
|
|
require.NoError(t, err)
|
|
if op.Time == 0 {
|
|
break
|
|
}
|
|
targetOps = append(targetOps, op)
|
|
height = op.Time + 1
|
|
}
|
|
|
|
require.Equal(t, sourceOps, targetOps)
|
|
}
|
|
}
|
|
|
|
func TestEvacuateTreesRemote(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
mutex := sync.Mutex{}
|
|
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids
|
|
prm.Scope = EvacuateScopeTrees
|
|
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (bool, string, error) {
|
|
key := contID.String() + treeID
|
|
var height uint64
|
|
for {
|
|
op, err := f.TreeGetOpLog(ctx, contID, treeID, height)
|
|
require.NoError(t, err)
|
|
|
|
if op.Time == 0 {
|
|
return true, "", nil
|
|
}
|
|
mutex.Lock()
|
|
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
|
mutex.Unlock()
|
|
height = op.Time + 1
|
|
}
|
|
}
|
|
|
|
expectedShardIDs := make([]string, 0, len(ids))
|
|
for _, id := range ids {
|
|
expectedShardIDs = append(expectedShardIDs, id.String())
|
|
}
|
|
|
|
st, err := e.GetEvacuationState(context.Background())
|
|
require.NoError(t, err, "get init state failed")
|
|
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
|
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
|
|
require.Nil(t, st.StartedAt(), "invalid init started at")
|
|
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
|
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
|
|
st = testWaitForEvacuationCompleted(t, e)
|
|
|
|
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
|
|
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
|
|
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
|
|
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
|
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
|
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
|
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
|
|
|
expectedTreeOps := make(map[string][]*pilorama.Move)
|
|
for i := range len(e.shards) {
|
|
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()])
|
|
require.NoError(t, err, "list source trees failed")
|
|
require.Len(t, sourceTrees, 3)
|
|
|
|
for _, tr := range sourceTrees {
|
|
key := tr.CID.String() + tr.TreeID
|
|
var height uint64
|
|
for {
|
|
op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
|
|
require.NoError(t, err)
|
|
|
|
if op.Time == 0 {
|
|
break
|
|
}
|
|
expectedTreeOps[key] = append(expectedTreeOps[key], &op)
|
|
height = op.Time + 1
|
|
}
|
|
}
|
|
}
|
|
|
|
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
|
|
}
|
|
|
|
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
|
|
e, ids, _ := newEngineEvacuate(t, 2, 0)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
// Create container with policy REP 2
|
|
cnr1 := container.Container{}
|
|
p1 := netmap.PlacementPolicy{}
|
|
p1.SetContainerBackupFactor(1)
|
|
x1 := netmap.ReplicaDescriptor{}
|
|
x1.SetNumberOfObjects(2)
|
|
p1.AddReplicas(x1)
|
|
x1 = netmap.ReplicaDescriptor{}
|
|
x1.SetNumberOfObjects(1)
|
|
p1.AddReplicas(x1)
|
|
cnr1.SetPlacementPolicy(p1)
|
|
cnr1.SetAttribute("cnr", "cnr1")
|
|
|
|
var idCnr1 cid.ID
|
|
container.CalculateID(&idCnr1, cnr1)
|
|
|
|
cnrmap := make(map[cid.ID]*container.Container)
|
|
var cids []cid.ID
|
|
cnrmap[idCnr1] = &cnr1
|
|
cids = append(cids, idCnr1)
|
|
|
|
// Create container with policy REP 1
|
|
cnr2 := container.Container{}
|
|
p2 := netmap.PlacementPolicy{}
|
|
p2.SetContainerBackupFactor(1)
|
|
x2 := netmap.ReplicaDescriptor{}
|
|
x2.SetNumberOfObjects(1)
|
|
p2.AddReplicas(x2)
|
|
x2 = netmap.ReplicaDescriptor{}
|
|
x2.SetNumberOfObjects(1)
|
|
p2.AddReplicas(x2)
|
|
cnr2.SetPlacementPolicy(p2)
|
|
cnr2.SetAttribute("cnr", "cnr2")
|
|
|
|
var idCnr2 cid.ID
|
|
container.CalculateID(&idCnr2, cnr2)
|
|
cnrmap[idCnr2] = &cnr2
|
|
cids = append(cids, idCnr2)
|
|
|
|
// Create container for simulate removing
|
|
cnr3 := container.Container{}
|
|
p3 := netmap.PlacementPolicy{}
|
|
p3.SetContainerBackupFactor(1)
|
|
x3 := netmap.ReplicaDescriptor{}
|
|
x3.SetNumberOfObjects(1)
|
|
p3.AddReplicas(x3)
|
|
cnr3.SetPlacementPolicy(p3)
|
|
cnr3.SetAttribute("cnr", "cnr3")
|
|
|
|
var idCnr3 cid.ID
|
|
container.CalculateID(&idCnr3, cnr3)
|
|
cids = append(cids, idCnr3)
|
|
|
|
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
|
|
|
|
for _, sh := range ids {
|
|
for j := range 3 {
|
|
for range 4 {
|
|
obj := testutil.GenerateObjectWithCID(cids[j])
|
|
var putPrm shard.PutPrm
|
|
putPrm.SetObject(obj)
|
|
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[0:1]
|
|
prm.Scope = EvacuateScopeObjects
|
|
prm.RepOneOnly = true
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
require.NoError(t, e.Evacuate(context.Background(), prm))
|
|
st := testWaitForEvacuationCompleted(t, e)
|
|
require.Equal(t, "", st.ErrorMessage())
|
|
require.Equal(t, uint64(4), st.ObjectsEvacuated())
|
|
require.Equal(t, uint64(8), st.ObjectsSkipped())
|
|
require.Equal(t, uint64(0), st.ObjectsFailed())
|
|
}
|
|
|
|
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
|
|
t.Skip()
|
|
e, ids, _ := newEngineEvacuate(t, 2, 0)
|
|
defer func() {
|
|
require.NoError(t, e.Close(context.Background()))
|
|
}()
|
|
|
|
cnrmap := make(map[cid.ID]*container.Container)
|
|
var cids []cid.ID
|
|
// Create containers with policy REP 1
|
|
for i := range 10_000 {
|
|
cnr1 := container.Container{}
|
|
p1 := netmap.PlacementPolicy{}
|
|
p1.SetContainerBackupFactor(1)
|
|
x1 := netmap.ReplicaDescriptor{}
|
|
x1.SetNumberOfObjects(2)
|
|
p1.AddReplicas(x1)
|
|
cnr1.SetPlacementPolicy(p1)
|
|
cnr1.SetAttribute("i", strconv.Itoa(i))
|
|
|
|
var idCnr1 cid.ID
|
|
container.CalculateID(&idCnr1, cnr1)
|
|
|
|
cnrmap[idCnr1] = &cnr1
|
|
cids = append(cids, idCnr1)
|
|
}
|
|
|
|
e.SetContainerSource(&containerStorage{
|
|
cntmap: cnrmap,
|
|
latency: time.Millisecond * 100,
|
|
})
|
|
|
|
for _, cnt := range cids {
|
|
for range 1 {
|
|
obj := testutil.GenerateObjectWithCID(cnt)
|
|
var putPrm shard.PutPrm
|
|
putPrm.SetObject(obj)
|
|
_, err := e.shards[ids[0].String()].Put(context.Background(), putPrm)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
var prm EvacuateShardPrm
|
|
prm.ShardID = ids[0:1]
|
|
prm.Scope = EvacuateScopeObjects
|
|
prm.RepOneOnly = true
|
|
prm.ContainerWorkerCount = 10
|
|
|
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
|
|
|
start := time.Now()
|
|
err := e.Evacuate(context.Background(), prm)
|
|
testWaitForEvacuationCompleted(t, e)
|
|
t.Logf("evacuate took %v\n", time.Since(start))
|
|
require.NoError(t, err)
|
|
}
|