diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 87542eec1..0ce59a0be 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -74,7 +74,7 @@ func (s EvacuateScope) TreesOnly() bool { // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { ShardID []*shard.ID - ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error + ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error) IgnoreErrors bool Async bool @@ -668,13 +668,21 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } - err = prm.ObjectsHandler(ctx, addr, getRes.Object()) + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err } - res.objEvacuated.Add(1) + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) + } } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index e8d9a449f..f097665bf 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -145,18 +145,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") - acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error { + acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { var n uint64 - return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error { + return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { if n == max { - return errReplication + return false, errReplication } n++ for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) - return nil + return true, nil } } require.FailNow(t, "handler was called with an unexpected object: %s", addr) @@ -268,13 +268,13 @@ func TestEvacuateCancellation(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() default: } - return nil + return true, nil } prm.Scope = EvacuateScopeObjects @@ -301,14 +301,14 @@ func TestEvacuateSingleProcess(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker - return nil + return true, nil } eg, egCtx := errgroup.WithContext(context.Background()) @@ -344,14 +344,14 @@ func TestEvacuateObjectsAsync(t *testing.T) { var prm EvacuateShardPrm prm.ShardID = ids[1:2] prm.Scope = EvacuateScopeObjects - prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { select { case <-running: default: close(running) } <-blocker - return nil + return true, nil } st, err := e.GetEvacuationState(context.Background()) diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 794bc199a..dd609caec 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -57,17 +57,21 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe return resp, nil } -func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { +func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { cid, ok := obj.ContainerID() if !ok { // Return nil to prevent situations where a shard can't be evacuated // because of a single bad/corrupted object. - return nil + return false, nil } nodes, err := s.getContainerNodes(cid) if err != nil { - return err + return false, err + } + + if len(nodes) == 0 { + return false, nil } var res replicatorResult @@ -80,9 +84,9 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj s.replicator.HandleReplicationTask(ctx, task, &res) if res.count == 0 { - return errors.New("object was not replicated") + return false, errors.New("object was not replicated") } - return nil + return true, nil } func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {