diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index b88c249b1..2e0344bfb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -86,7 +86,6 @@ type EvacuateShardPrm struct { ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error) IgnoreErrors bool - Async bool Scope EvacuateScope RepOneOnly bool @@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { +func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error { select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() default: } @@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), )) @@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev shards, err := e.getActualShards(shardIDs, prm) if err != nil { - return nil, err + return err } shardsToEvacuate := make(map[string]*shard.Shard) @@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev } res := NewEvacuateShardRes() - ctx = ctxOrBackground(ctx, prm.Async) - eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) + ctx = context.WithoutCancel(ctx) + eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) if err != nil { - return nil, err + return err } var mtx sync.RWMutex @@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return t } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) + return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) - if prm.Async { - return nil, nil - } - - return res, eg.Wait() -} - -func ctxOrBackground(ctx context.Context, background bool) context.Context { - if background { - return context.Background() - } - return ctx + return nil } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, @@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), attribute.Bool("repOneOnly", prm.RepOneOnly), diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index beab8384e..248c39155 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) { prm.Scope = EvacuateScopeObjects t.Run("must be read-only", func(t *testing.T) { - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, ErrMustBeReadOnly) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) }) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated()) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated()) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presense @@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) { } // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. - res, err = e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st = testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(0), st.ObjectsEvacuated()) checkHasObjects(t) @@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) { checkHasObjects(t) } +func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState { + var st *EvacuationState + var err error + require.Eventually(t, func() bool { + st, err = e.GetEvacuationState(context.Background()) + require.NoError(t, err) + return st.ProcessingStatus() == EvacuateProcessStateCompleted + }, 3*time.Second, 10*time.Millisecond) + return st +} + func TestEvacuateObjectsNetwork(t *testing.T) { t.Parallel() @@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) prm.ObjectsHandler = acceptOneOf(objects, 2) - res, err = e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, uint64(2), st.ObjectsEvacuated()) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Parallel() @@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, 2) prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, uint64(2), st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, 3) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(3), st.ObjectsEvacuated()) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, totalCount-1, st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, totalCount) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, totalCount, res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, totalCount, st.ObjectsEvacuated()) }) }) } @@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - res, err := e.Evacuate(ctx, prm) + err := e.Evacuate(ctx, prm) require.ErrorContains(t, err, "context canceled") - require.Equal(t, uint64(0), res.ObjectsEvacuated()) } func TestEvacuateCancellationByError(t *testing.T) { @@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) { prm.ObjectWorkerCount = 2 prm.ContainerWorkerCount = 2 - _, err := e.Evacuate(context.Background(), prm) - require.ErrorContains(t, err, "test error") + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), "test error") } func TestEvacuateSingleProcess(t *testing.T) { @@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) { eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { - res, err := e.Evacuate(egCtx, prm) - require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") return nil }) eg.Go(func() error { <-running - res, err := e.Evacuate(egCtx, prm) - require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed") - require.Equal(t, uint64(0), res.ObjectsEvacuated()) + require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed") close(blocker) return nil }) require.NoError(t, eg.Wait()) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, uint64(3), st.ObjectsEvacuated()) + require.Equal(t, st.ErrorMessage(), "") } func TestEvacuateObjectsAsync(t *testing.T) { @@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) { eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { - res, err := e.Evacuate(egCtx, prm) - require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") + st = testWaitForEvacuationCompleted(t, e) + require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") return nil }) @@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) { close(blocker) - require.Eventually(t, func() bool { - st, err = e.GetEvacuationState(context.Background()) - return st.ProcessingStatus() == EvacuateProcessStateCompleted - }, 3*time.Second, 10*time.Millisecond, "invalid final state") - - require.NoError(t, err, "get final state failed") + st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") @@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) { require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") - res, err := e.Evacuate(context.Background(), prm) - require.NotNil(t, res, "sync evacuation result must be not nil") - require.NoError(t, err, "evacuation failed") - - st, err = e.GetEvacuationState(context.Background()) - require.NoError(t, err, "get evacuation state failed") - require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) + require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") + st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") @@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) { require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") - res, err := e.Evacuate(context.Background(), prm) - require.NotNil(t, res, "sync evacuation must return not nil") - require.NoError(t, err, "evacuation failed") + require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") + st = testWaitForEvacuationCompleted(t, e) - st, err = e.GetEvacuationState(context.Background()) - require.NoError(t, err, "get evacuation state failed") - require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) - - require.NoError(t, err, "get final state failed") require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") @@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(4), res.ObjectsEvacuated()) - require.Equal(t, uint64(8), res.ObjectsSkipped()) - require.Equal(t, uint64(0), res.ObjectsFailed()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, "", st.ErrorMessage()) + require.Equal(t, uint64(4), st.ObjectsEvacuated()) + require.Equal(t, uint64(8), st.ObjectsSkipped()) + require.Equal(t, uint64(0), st.ObjectsFailed()) } func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { @@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) start := time.Now() - _, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) + testWaitForEvacuationCompleted(t, e) t.Logf("evacuate took %v\n", time.Since(start)) require.NoError(t, err) } diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 04465eb26..da5401515 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -42,7 +42,6 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha IgnoreErrors: req.GetBody().GetIgnoreErrors(), ObjectsHandler: s.replicateObject, TreeHandler: s.replicateTree, - Async: true, Scope: engine.EvacuateScope(req.GetBody().GetScope()), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),