package engine import ( "context" "errors" "fmt" "path/filepath" "strconv" "sync" "sync/atomic" "testing" "time" coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) type containerStorage struct { cntmap map[cid.ID]*container.Container latency time.Duration } func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { time.Sleep(cs.latency) v, ok := cs.cntmap[id] if !ok { return nil, new(apistatus.ContainerNotFound) } coreCnt := coreContainer.Container{ Value: *v, } return &coreCnt, nil } func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) { return nil, nil } func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() te := testNewEngine(t). setShardsNumOpts(t, shardNum, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(test.NewLogger(t)), shard.WithBlobStorOptions( blobstor.WithStorages([]blobstor.SubStorage{{ Storage: fstree.New( fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))), fstree.WithDepth(1)), }})), shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))), meta.WithPermissions(0o700), meta.WithEpochState(epochState{})), shard.WithPiloramaOptions( pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))), pilorama.WithPerm(0o700), ), } }). prepare(t) e, ids := te.engine, te.shardIDs objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) treeID := "version" meta := []pilorama.KeyValue{ {Key: pilorama.AttributeVersion, Value: []byte("XXX")}, {Key: pilorama.AttributeFilename, Value: []byte("file.txt")}, } cnrMap := make(map[cid.ID]*container.Container) for _, sh := range ids { for i := range objPerShard { // Create dummy container cnr1 := container.Container{} cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i)) contID := cidtest.ID() cnrMap[contID] = &cnr1 obj := testutil.GenerateObjectWithCID(contID) objects = append(objects, obj) var putPrm shard.PutPrm putPrm.SetObject(obj) _, err := e.shards[sh.String()].Put(context.Background(), putPrm) require.NoError(t, err) _, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1}, treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta) require.NoError(t, err) } } e.SetContainerSource(&containerStorage{cntmap: cnrMap}) return e, ids, objects } func TestEvacuateShardObjects(t *testing.T) { t.Parallel() const objPerShard = 3 e, ids, objects := newEngineEvacuate(t, 3, objPerShard) defer func() { require.NoError(t, e.Close(context.Background())) }() evacuateShardID := ids[2].String() checkHasObjects := func(t *testing.T) { for i := range objects { var prm GetPrm prm.WithAddress(objectCore.AddressOf(objects[i])) _, err := e.Get(context.Background(), prm) require.NoError(t, err) } } checkHasObjects(t) var prm EvacuateShardPrm prm.ShardID = ids[2:3] prm.Scope = EvacuateScopeObjects t.Run("must be read-only", func(t *testing.T) { err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, ErrMustBeReadOnly) }) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) err := e.Evacuate(context.Background(), prm) require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e) require.Equal(t, st.ErrorMessage(), "") require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated()) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presense // of all metabase checks/marks. // Second case ensures that all objects are indeed moved and available. checkHasObjects(t) // Objects on evacuated shards should be logically unavailable, but persisted on disk. // This is necessary to prevent removing it by policer in case of `REP 1` policy. for _, obj := range objects[len(objects)-objPerShard:] { var prmGet shard.GetPrm prmGet.SetAddress(objectCore.AddressOf(obj)) _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) require.Error(t, err) prmGet.SkipEvacCheck(true) _, err = e.shards[evacuateShardID].Get(context.Background(), prmGet) require.NoError(t, err) var prmHead shard.HeadPrm prmHead.SetAddress(objectCore.AddressOf(obj)) _, err = e.shards[evacuateShardID].Head(context.Background(), prmHead) require.Error(t, err) var existsPrm shard.ExistsPrm existsPrm.Address = objectCore.AddressOf(obj) _, err = e.shards[evacuateShardID].Exists(context.Background(), existsPrm) require.Error(t, err) var rngPrm shard.RngPrm rngPrm.SetAddress(objectCore.AddressOf(obj)) _, err = e.shards[evacuateShardID].GetRange(context.Background(), rngPrm) require.Error(t, err) } // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. require.NoError(t, e.Evacuate(context.Background(), prm)) st = testWaitForEvacuationCompleted(t, e) require.Equal(t, st.ErrorMessage(), "") require.Equal(t, uint64(0), st.ObjectsEvacuated()) checkHasObjects(t) e.mtx.Lock() delete(e.shards, evacuateShardID) delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) } func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState { var st *EvacuationState var err error require.Eventually(t, func() bool { st, err = e.GetEvacuationState(context.Background()) require.NoError(t, err) return st.ProcessingStatus() == EvacuateProcessStateCompleted }, 3*time.Second, 10*time.Millisecond) return st } func TestEvacuateObjectsNetwork(t *testing.T) { t.Parallel() errReplication := errors.New("handler error") acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { var n atomic.Uint64 var mtx sync.Mutex return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { mtx.Lock() defer mtx.Unlock() if n.Load() == max { return false, errReplication } n.Add(1) for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) return true, nil } } require.FailNow(t, "handler was called with an unexpected object: %s", addr) panic("unreachable") } } t.Run("single shard", func(t *testing.T) { t.Parallel() e, ids, objects := newEngineEvacuate(t, 1, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() evacuateShardID := ids[0].String() require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeObjects err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) prm.ObjectsHandler = acceptOneOf(objects, 2) require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Contains(t, st.ErrorMessage(), errReplication.Error()) require.Equal(t, uint64(2), st.ObjectsEvacuated()) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Parallel() e, ids, objects := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.ObjectsHandler = acceptOneOf(objects, 2) prm.Scope = EvacuateScopeObjects require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Contains(t, st.ErrorMessage(), errReplication.Error()) require.Equal(t, uint64(2), st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, 3) require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Equal(t, st.ErrorMessage(), "") require.Equal(t, uint64(3), st.ObjectsEvacuated()) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { t.Parallel() e, ids, objects := newEngineEvacuate(t, 4, 5) defer func() { require.NoError(t, e.Close(context.Background())) }() evacuateIDs := ids[0:3] var totalCount uint64 for i := range evacuateIDs { res, err := e.shards[ids[i].String()].List(context.Background()) require.NoError(t, err) totalCount += uint64(len(res.AddressList())) } for i := range ids { require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly)) } var prm EvacuateShardPrm prm.ShardID = evacuateIDs prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.Scope = EvacuateScopeObjects require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Contains(t, st.ErrorMessage(), errReplication.Error()) require.Equal(t, totalCount-1, st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, totalCount) require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Equal(t, st.ErrorMessage(), "") require.Equal(t, totalCount, st.ObjectsEvacuated()) }) }) } func TestEvacuateCancellation(t *testing.T) { t.Parallel() e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() default: } return true, nil } prm.Scope = EvacuateScopeObjects ctx, cancel := context.WithCancel(context.Background()) cancel() err := e.Evacuate(ctx, prm) require.ErrorContains(t, err, "context canceled") } func TestEvacuateCancellationByError(t *testing.T) { t.Parallel() e, ids, _ := newEngineEvacuate(t, 2, 10) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] var once atomic.Bool prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { var err error flag := true if once.CompareAndSwap(false, true) { err = errors.New("test error") flag = false } return flag, err } prm.Scope = EvacuateScopeObjects prm.ObjectWorkerCount = 2 prm.ContainerWorkerCount = 2 require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Contains(t, st.ErrorMessage(), "test error") } func TestEvacuateSingleProcess(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) blocker := make(chan interface{}) running := make(chan interface{}) var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker return true, nil } eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") return nil }) eg.Go(func() error { <-running require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed") close(blocker) return nil }) require.NoError(t, eg.Wait()) st := testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "") } func TestEvacuateObjectsAsync(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) blocker := make(chan interface{}) running := make(chan interface{}) var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker return true, nil } st, err := e.GetEvacuationState(context.Background()) require.NoError(t, err, "get init state failed") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count") require.Nil(t, st.StartedAt(), "invalid init started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") return nil }) <-running st, err = e.GetEvacuationState(context.Background()) require.NoError(t, err, "get running state failed") require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state") require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count") require.NotNil(t, st.StartedAt(), "invalid running started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") expectedShardIDs := make([]string, 0, 2) for _, id := range ids[1:2] { expectedShardIDs = append(expectedShardIDs, id.String()) } require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.Error(t, e.ResetEvacuationStatus(context.Background())) close(blocker) st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid final error message") require.NoError(t, eg.Wait()) require.NoError(t, e.ResetEvacuationStatus(context.Background())) st, err = e.GetEvacuationState(context.Background()) require.NoError(t, err, "get state after reset failed") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid state after reset") require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid count after reset") require.Nil(t, st.StartedAt(), "invalid started at after reset") require.Nil(t, st.FinishedAt(), "invalid finished at after reset") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid shard ids after reset") require.Equal(t, "", st.ErrorMessage(), "invalid error message after reset") } func TestEvacuateTreesLocal(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeTrees expectedShardIDs := make([]string, 0, 1) for _, id := range ids[0:1] { expectedShardIDs = append(expectedShardIDs, id.String()) } st, err := e.GetEvacuationState(context.Background()) require.NoError(t, err, "get init state failed") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count") require.Nil(t, st.StartedAt(), "invalid init started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid final error message") sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()]) require.NoError(t, err, "list source trees failed") require.Len(t, sourceTrees, 3) for _, tr := range sourceTrees { exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID) require.NoError(t, err, "failed to check tree existance") require.True(t, exists, "tree doesn't exists on target shard") var height uint64 var sourceOps []pilorama.Move for { op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) require.NoError(t, err) if op.Time == 0 { break } sourceOps = append(sourceOps, op) height = op.Time + 1 } height = 0 var targetOps []pilorama.Move for { op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) require.NoError(t, err) if op.Time == 0 { break } targetOps = append(targetOps, op) height = op.Time + 1 } require.Equal(t, sourceOps, targetOps) } } func TestEvacuateTreesRemote(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { require.NoError(t, e.Close(context.Background())) }() require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) var prm EvacuateShardPrm prm.ShardID = ids prm.Scope = EvacuateScopeTrees prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (bool, string, error) { key := contID.String() + treeID var height uint64 for { op, err := f.TreeGetOpLog(ctx, contID, treeID, height) require.NoError(t, err) if op.Time == 0 { return true, "", nil } mutex.Lock() evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) mutex.Unlock() height = op.Time + 1 } } expectedShardIDs := make([]string, 0, len(ids)) for _, id := range ids { expectedShardIDs = append(expectedShardIDs, id.String()) } st, err := e.GetEvacuationState(context.Background()) require.NoError(t, err, "get init state failed") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count") require.Nil(t, st.StartedAt(), "invalid init started at") require.Nil(t, st.FinishedAt(), "invalid init finished at") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid final error message") expectedTreeOps := make(map[string][]*pilorama.Move) for i := range len(e.shards) { sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()]) require.NoError(t, err, "list source trees failed") require.Len(t, sourceTrees, 3) for _, tr := range sourceTrees { key := tr.CID.String() + tr.TreeID var height uint64 for { op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) require.NoError(t, err) if op.Time == 0 { break } expectedTreeOps[key] = append(expectedTreeOps[key], &op) height = op.Time + 1 } } } require.Equal(t, expectedTreeOps, evacuatedTreeOps) } func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 0) defer func() { require.NoError(t, e.Close(context.Background())) }() // Create container with policy REP 2 cnr1 := container.Container{} p1 := netmap.PlacementPolicy{} p1.SetContainerBackupFactor(1) x1 := netmap.ReplicaDescriptor{} x1.SetNumberOfObjects(2) p1.AddReplicas(x1) x1 = netmap.ReplicaDescriptor{} x1.SetNumberOfObjects(1) p1.AddReplicas(x1) cnr1.SetPlacementPolicy(p1) cnr1.SetAttribute("cnr", "cnr1") var idCnr1 cid.ID container.CalculateID(&idCnr1, cnr1) cnrmap := make(map[cid.ID]*container.Container) var cids []cid.ID cnrmap[idCnr1] = &cnr1 cids = append(cids, idCnr1) // Create container with policy REP 1 cnr2 := container.Container{} p2 := netmap.PlacementPolicy{} p2.SetContainerBackupFactor(1) x2 := netmap.ReplicaDescriptor{} x2.SetNumberOfObjects(1) p2.AddReplicas(x2) x2 = netmap.ReplicaDescriptor{} x2.SetNumberOfObjects(1) p2.AddReplicas(x2) cnr2.SetPlacementPolicy(p2) cnr2.SetAttribute("cnr", "cnr2") var idCnr2 cid.ID container.CalculateID(&idCnr2, cnr2) cnrmap[idCnr2] = &cnr2 cids = append(cids, idCnr2) // Create container for simulate removing cnr3 := container.Container{} p3 := netmap.PlacementPolicy{} p3.SetContainerBackupFactor(1) x3 := netmap.ReplicaDescriptor{} x3.SetNumberOfObjects(1) p3.AddReplicas(x3) cnr3.SetPlacementPolicy(p3) cnr3.SetAttribute("cnr", "cnr3") var idCnr3 cid.ID container.CalculateID(&idCnr3, cnr3) cids = append(cids, idCnr3) e.SetContainerSource(&containerStorage{cntmap: cnrmap}) for _, sh := range ids { for j := range 3 { for range 4 { obj := testutil.GenerateObjectWithCID(cids[j]) var putPrm shard.PutPrm putPrm.SetObject(obj) _, err := e.shards[sh.String()].Put(context.Background(), putPrm) require.NoError(t, err) } } } var prm EvacuateShardPrm prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeObjects prm.RepOneOnly = true require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.Evacuate(context.Background(), prm)) st := testWaitForEvacuationCompleted(t, e) require.Equal(t, "", st.ErrorMessage()) require.Equal(t, uint64(4), st.ObjectsEvacuated()) require.Equal(t, uint64(8), st.ObjectsSkipped()) require.Equal(t, uint64(0), st.ObjectsFailed()) } func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { t.Skip() e, ids, _ := newEngineEvacuate(t, 2, 0) defer func() { require.NoError(t, e.Close(context.Background())) }() cnrmap := make(map[cid.ID]*container.Container) var cids []cid.ID // Create containers with policy REP 1 for i := range 10_000 { cnr1 := container.Container{} p1 := netmap.PlacementPolicy{} p1.SetContainerBackupFactor(1) x1 := netmap.ReplicaDescriptor{} x1.SetNumberOfObjects(2) p1.AddReplicas(x1) cnr1.SetPlacementPolicy(p1) cnr1.SetAttribute("i", strconv.Itoa(i)) var idCnr1 cid.ID container.CalculateID(&idCnr1, cnr1) cnrmap[idCnr1] = &cnr1 cids = append(cids, idCnr1) } e.SetContainerSource(&containerStorage{ cntmap: cnrmap, latency: time.Millisecond * 100, }) for _, cnt := range cids { for range 1 { obj := testutil.GenerateObjectWithCID(cnt) var putPrm shard.PutPrm putPrm.SetObject(obj) _, err := e.shards[ids[0].String()].Put(context.Background(), putPrm) require.NoError(t, err) } } var prm EvacuateShardPrm prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeObjects prm.RepOneOnly = true prm.ContainerWorkerCount = 10 require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) start := time.Now() err := e.Evacuate(context.Background(), prm) testWaitForEvacuationCompleted(t, e) t.Logf("evacuate took %v\n", time.Since(start)) require.NoError(t, err) }