From eca5c210dd21c5c6f30c5817c45b808589a2c651 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 May 2023 14:16:13 +0300 Subject: [PATCH] [#299] evacuate: Add context cancel checks Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 3 +- pkg/local_object_storage/engine/evacuate.go | 35 ++++++++++++++----- .../engine/evacuate_test.go | 29 +++++++++++++-- pkg/services/control/server/evacuate.go | 4 +-- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6c1c931f..ab7b8f63 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -239,7 +239,8 @@ const ( EngineShardIsMovedInDegradedModeDueToErrorThreshold = "shard is moved in degraded mode due to error threshold" // Info in ../node/pkg/local_object_storage/engine/engine.go EngineModeChangeIsInProgressIgnoringSetmodeRequest = "mode change is in progress, ignoring set-mode request" // Debug in ../node/pkg/local_object_storage/engine/engine.go EngineStartedShardsEvacuation = "started shards evacuation" // Info in ../node/pkg/local_object_storage/engine/evacuate.go - EngineFinishedShardsEvacuation = "finished shards evacuation" // Info in ../node/pkg/local_object_storage/engine/evacuate.go + EngineFinishedSuccessfullyShardsEvacuation = "shards evacuation finished successfully" // Info in ../node/pkg/local_object_storage/engine/evacuate.go + EngineFinishedWithErrorShardsEvacuation = "shards evacuation finished with error" // Error in ../node/pkg/local_object_storage/engine/evacuate.go EngineObjectIsMovedToAnotherShard = "object is moved to another shard" // Debug in ../node/pkg/local_object_storage/engine/evacuate.go MetabaseMissingMatcher = "missing matcher" // Debug in ../node/pkg/local_object_storage/metabase/select.go MetabaseErrorInFKBTSelection = "error in FKBT selection" // Debug in ../node/pkg/local_object_storage/metabase/select.go diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index e212784a..761ed24b 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -22,7 +22,7 @@ var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode") // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { shardID []*shard.ID - handler func(oid.Address, *objectSDK.Object) error + handler func(context.Context, oid.Address, *objectSDK.Object) error ignoreErrors bool } @@ -42,7 +42,7 @@ func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { } // WithFaultHandler sets handler to call for objects which cannot be saved on other shards. -func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) { +func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) { p.handler = f } @@ -89,11 +89,12 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva for _, shardID := range shardIDs { if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs)) return res, err } } - e.log.Info(logs.EngineFinishedShardsEvacuation, zap.Strings("shard_ids", shardIDs)) + e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs)) return res, nil } @@ -168,6 +169,11 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { for i := range toEvacuate { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } addr := toEvacuate[i].Address var getPrm shard.GetPrm @@ -181,7 +187,12 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return err } - if e.tryEvacuateObject(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) { + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) + if err != nil { + return err + } + + if evacuatedLocal { continue } @@ -191,7 +202,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } - err = prm.handler(addr, getRes.Object()) + err = prm.handler(ctx, addr, getRes.Object()) if err != nil { return err } @@ -200,10 +211,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return nil } -func (e *StorageEngine) tryEvacuateObject(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, - shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool { +func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) { hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) for j := range shards { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } @@ -216,9 +233,9 @@ func (e *StorageEngine) tryEvacuateObject(ctx context.Context, addr oid.Address, zap.Stringer("addr", addr)) res.count++ } - return true + return true, nil } } - return false + return false, nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index fc9da5e3..11cb362e 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -137,9 +137,9 @@ func TestEvacuateShard(t *testing.T) { func TestEvacuateNetwork(t *testing.T) { var errReplication = errors.New("handler error") - acceptOneOf := func(objects []*objectSDK.Object, max int) func(oid.Address, *objectSDK.Object) error { + acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error { var n int - return func(addr oid.Address, obj *objectSDK.Object) error { + return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error { if n == max { return errReplication } @@ -230,3 +230,28 @@ func TestEvacuateNetwork(t *testing.T) { }) }) } + +func TestEvacuateCancellation(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 3) + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.shardID = ids[1:2] + prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + res, err := e.Evacuate(ctx, prm) + require.ErrorContains(t, err, "context canceled") + require.Equal(t, 0, res.Count()) +} diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index b64a9188..afa4011b 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -48,7 +48,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe return resp, nil } -func (s *Server) replicate(addr oid.Address, obj *objectSDK.Object) error { +func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { cid, ok := obj.ContainerID() if !ok { // Return nil to prevent situations where a shard can't be evacuated @@ -89,7 +89,7 @@ func (s *Server) replicate(addr oid.Address, obj *objectSDK.Object) error { task.SetObjectAddress(addr) task.SetCopiesNumber(1) task.SetNodes(nodes) - s.replicator.HandleTask(context.TODO(), task, &res) + s.replicator.HandleTask(ctx, task, &res) if res.count == 0 { return errors.New("object was not replicated")