[#1549] engine: Drop Async flag from evacuation parameters
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 4m4s
DCO action / DCO (pull_request) Successful in 5m4s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m5s
Vulncheck / Vulncheck (pull_request) Successful in 4m56s
Tests and linters / Staticcheck (pull_request) Successful in 5m10s
Build / Build Components (pull_request) Successful in 5m32s
Tests and linters / Tests with -race (pull_request) Successful in 6m24s
Tests and linters / Lint (pull_request) Successful in 6m36s
Tests and linters / Tests (pull_request) Successful in 6m46s
Tests and linters / gopls check (pull_request) Successful in 7m9s
Tests and linters / Run gofumpt (push) Successful in 1m34s
Tests and linters / Staticcheck (push) Successful in 3m22s
Vulncheck / Vulncheck (push) Successful in 3m43s
Tests and linters / Lint (push) Successful in 4m16s
Build / Build Components (push) Successful in 5m0s
Pre-commit hooks / Pre-commit (push) Successful in 5m3s
Tests and linters / Tests (push) Successful in 5m9s
Tests and linters / Tests with -race (push) Successful in 5m13s
Tests and linters / gopls check (push) Successful in 5m54s

Now it is only async evacuation.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-12-10 16:53:19 +03:00
parent ac0511d214
commit 41da27dad5
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 74 additions and 87 deletions

View file

@ -86,7 +86,6 @@ type EvacuateShardPrm struct {
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool IgnoreErrors bool
Async bool
Scope EvacuateScope Scope EvacuateScope
RepOneOnly bool RepOneOnly bool
@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others. // Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode. // The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return ctx.Err()
default: default:
} }
@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
)) ))
@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
shards, err := e.getActualShards(shardIDs, prm) shards, err := e.getActualShards(shardIDs, prm)
if err != nil { if err != nil {
return nil, err return err
} }
shardsToEvacuate := make(map[string]*shard.Shard) shardsToEvacuate := make(map[string]*shard.Shard)
@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
} }
res := NewEvacuateShardRes() res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.Async) ctx = context.WithoutCancel(ctx)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil { if err != nil {
return nil, err return err
} }
var mtx sync.RWMutex var mtx sync.RWMutex
@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return t return t
} }
eg.Go(func() error { eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate)
}) })
if prm.Async { return nil
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
} }
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly), attribute.Bool("repOneOnly", prm.RepOneOnly),

View file

@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) {
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) { t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly) require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}) })
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated()) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal. // We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense // First case is a real-world use-case. It ensures that an object can be put in presense
@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) {
} }
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done. // Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(0), res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(0), st.ObjectsEvacuated())
checkHasObjects(t) checkHasObjects(t)
@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) {
checkHasObjects(t) checkHasObjects(t)
} }
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond)
return st
}
func TestEvacuateObjectsNetwork(t *testing.T) { func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel() t.Parallel()
@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards) require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.ObjectsHandler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(2), res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
}) })
t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel() t.Parallel()
@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(2), res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 3) prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(3), st.ObjectsEvacuated())
}) })
}) })
t.Run("multiple shards, evacuate many", func(t *testing.T) { t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorIs(t, err, errReplication) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, totalCount-1, res.ObjectsEvacuated()) require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, totalCount-1, st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount) prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, totalCount, res.ObjectsEvacuated()) require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, totalCount, st.ObjectsEvacuated())
}) })
}) })
} }
@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
res, err := e.Evacuate(ctx, prm) err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled") require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
} }
func TestEvacuateCancellationByError(t *testing.T) { func TestEvacuateCancellationByError(t *testing.T) {
@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) {
prm.ObjectWorkerCount = 2 prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2 prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.ErrorContains(t, err, "test error") st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), "test error")
} }
func TestEvacuateSingleProcess(t *testing.T) { func TestEvacuateSingleProcess(t *testing.T) {
@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil return nil
}) })
eg.Go(func() error { eg.Go(func() error {
<-running <-running
res, err := e.Evacuate(egCtx, prm) require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed")
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
close(blocker) close(blocker)
return nil return nil
}) })
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated())
require.Equal(t, st.ErrorMessage(), "")
} }
func TestEvacuateObjectsAsync(t *testing.T) { func TestEvacuateObjectsAsync(t *testing.T) {
@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
require.NoError(t, err, "first evacuation failed") st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), res.ObjectsEvacuated()) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
return nil return nil
}) })
@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) {
close(blocker) close(blocker)
require.Eventually(t, func() bool { st = testWaitForEvacuationCompleted(t, e)
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.NotNil(t, st.FinishedAt(), "invalid final finished at")
@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message") require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
require.NotNil(t, res, "sync evacuation must return not nil") st = testWaitForEvacuationCompleted(t, e)
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) require.NoError(t, e.Evacuate(context.Background(), prm))
require.NoError(t, err) st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(4), res.ObjectsEvacuated()) require.Equal(t, "", st.ErrorMessage())
require.Equal(t, uint64(8), res.ObjectsSkipped()) require.Equal(t, uint64(4), st.ObjectsEvacuated())
require.Equal(t, uint64(0), res.ObjectsFailed()) require.Equal(t, uint64(8), st.ObjectsSkipped())
require.Equal(t, uint64(0), st.ObjectsFailed())
} }
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now() start := time.Now()
_, err := e.Evacuate(context.Background(), prm) err := e.Evacuate(context.Background(), prm)
testWaitForEvacuationCompleted(t, e)
t.Logf("evacuate took %v\n", time.Since(start)) t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -42,7 +42,6 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
IgnoreErrors: req.GetBody().GetIgnoreErrors(), IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject, ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree, TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),