[#9999] engine: Drop Async flag from evacuation parameters
All checks were successful
DCO action / DCO (pull_request) Successful in 2m24s
Vulncheck / Vulncheck (pull_request) Successful in 2m51s
Tests and linters / Run gofumpt (pull_request) Successful in 3m33s
Tests and linters / gopls check (pull_request) Successful in 4m25s
Build / Build Components (pull_request) Successful in 4m42s
Tests and linters / Staticcheck (pull_request) Successful in 5m15s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m33s
Tests and linters / Lint (pull_request) Successful in 6m15s
Tests and linters / Tests (pull_request) Successful in 7m57s
Tests and linters / Tests with -race (pull_request) Successful in 8m6s

Now it is only async evacuation.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-12-10 16:53:19 +03:00
parent a0eedee0b4
commit 99ffd884e2
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 74 additions and 87 deletions

View file

@ -86,7 +86,6 @@ type EvacuateShardPrm struct {
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool
Async bool
Scope EvacuateScope
RepOneOnly bool
@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error {
select {
case <-ctx.Done():
return nil, ctx.Err()
return ctx.Err()
default:
}
@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
shards, err := e.getActualShards(shardIDs, prm)
if err != nil {
return nil, err
return err
}
shardsToEvacuate := make(map[string]*shard.Shard)
@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.Async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
ctx = context.WithoutCancel(ctx)
eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
return err
}
var mtx sync.RWMutex
@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return t
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate)
})
if prm.Async {
return nil, nil
}
return res, eg.Wait()
}
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
return nil
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),

View file

@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) {
prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) {
}
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(0), st.ObjectsEvacuated())
checkHasObjects(t)
@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) {
checkHasObjects(t)
}
func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState {
var st *EvacuationState
var err error
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err)
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond)
return st
}
func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel()
@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, uint64(2), st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, uint64(3), st.ObjectsEvacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), errReplication.Error())
require.Equal(t, totalCount-1, st.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, st.ErrorMessage(), "")
require.Equal(t, totalCount, st.ObjectsEvacuated())
})
})
}
@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
res, err := e.Evacuate(ctx, prm)
err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateCancellationByError(t *testing.T) {
@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) {
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Contains(t, st.ErrorMessage(), "test error")
}
func TestEvacuateSingleProcess(t *testing.T) {
@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.ObjectsEvacuated())
require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed")
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated())
require.Equal(t, st.ErrorMessage(), "")
}
func TestEvacuateObjectsAsync(t *testing.T) {
@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) {
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.ObjectsEvacuated())
require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
return nil
})
@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) {
close(blocker)
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation must return not nil")
require.NoError(t, err, "evacuation failed")
require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed")
st = testWaitForEvacuationCompleted(t, e)
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(8), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
require.NoError(t, e.Evacuate(context.Background(), prm))
st := testWaitForEvacuationCompleted(t, e)
require.Equal(t, "", st.ErrorMessage())
require.Equal(t, uint64(4), st.ObjectsEvacuated())
require.Equal(t, uint64(8), st.ObjectsSkipped())
require.Equal(t, uint64(0), st.ObjectsFailed())
}
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)
err := e.Evacuate(context.Background(), prm)
testWaitForEvacuationCompleted(t, e)
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}

View file

@ -42,7 +42,6 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),