From 2bac82cd6fd378b1958e1e9633b5ef98d2ff6553 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 3 Jul 2024 09:55:04 +0300 Subject: [PATCH] [#1222] engine: Fix object evacuation Do not fail evacuation if it unable to evacuate object to other node. Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/evacuate.go | 14 +++++++++--- .../engine/evacuate_test.go | 22 +++++++++---------- pkg/services/control/server/evacuate.go | 14 +++++++----- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 87542eec..0ce59a0b 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -74,7 +74,7 @@ func (s EvacuateScope) TreesOnly() bool { // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { ShardID []*shard.ID - ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error + ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error) IgnoreErrors bool Async bool @@ -668,13 +668,21 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } - err = prm.ObjectsHandler(ctx, addr, getRes.Object()) + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - res.objEvacuated.Add(1) + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) + } } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index e8d9a449..f097665b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -145,18 +145,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") - acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error { + acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { var n uint64 - return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error { + return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { if n == max { - return errReplication + return false, errReplication } n++ for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) - return nil + return true, nil } } require.FailNow(t, "handler was called with an unexpected object: %s", addr) @@ -268,13 +268,13 @@ func TestEvacuateCancellation(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() default: } - return nil + return true, nil } prm.Scope = EvacuateScopeObjects @@ -301,14 +301,14 @@ func TestEvacuateSingleProcess(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker - return nil + return true, nil } eg, egCtx := errgroup.WithContext(context.Background()) @@ -344,14 +344,14 @@ func TestEvacuateObjectsAsync(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker - return nil + return true, nil } st, err := e.GetEvacuationState(context.Background()) diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 794bc199..dd609cae 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -57,17 +57,21 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe return resp, nil } -func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { +func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { cid, ok := obj.ContainerID() if !ok { // Return nil to prevent situations where a shard can't be evacuated // because of a single bad/corrupted object. - return nil + return false, nil } nodes, err := s.getContainerNodes(cid) if err != nil { - return err + return false, err + } + + if len(nodes) == 0 { + return false, nil } var res replicatorResult @@ -80,9 +84,9 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj s.replicator.HandleReplicationTask(ctx, task, &res) if res.count == 0 { - return errors.New("object was not replicated") + return false, errors.New("object was not replicated") } - return nil + return true, nil } func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {