From 4e043a801c260565e210e5877cb8bc552709f044 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@morphbits.ru>
Date: Mon, 19 Sep 2022 13:57:59 +0300
Subject: [PATCH] [#1731] services/control: Replicate object over network in
 EvacuateShard RPC

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
---
 CHANGELOG.md                            |  4 +-
 cmd/neofs-node/config.go                |  3 ++
 cmd/neofs-node/control.go               |  2 +
 cmd/neofs-node/object.go                |  4 +-
 pkg/services/control/server/evacuate.go | 68 +++++++++++++++++++++++++
 pkg/services/control/server/server.go   | 20 ++++++++
 pkg/services/replicator/process.go      | 19 ++++---
 pkg/services/replicator/task.go         | 12 +++++
 8 files changed, 121 insertions(+), 11 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 49ec7b8be..0a1435fc4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,7 +7,9 @@ Changelog for NeoFS Node
 
 - Changelog updates CI step (#1808)
 
-### Changed 
+### Changed
+
+- Allow to evacuate shard data with `EvacuateShard` control RPC (#1800)
 
 ### Fixed
 
diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go
index d8c11ace4..c11322e3b 100644
--- a/cmd/neofs-node/config.go
+++ b/cmd/neofs-node/config.go
@@ -43,6 +43,7 @@ import (
 	getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
 	"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
 	tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
+	"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
 	trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
 	truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
 	"github.com/nspcc-dev/neofs-node/pkg/services/tree"
@@ -116,6 +117,8 @@ type cfg struct {
 
 	respSvc *response.Service
 
+	replicator *replicator.Replicator
+
 	cfgControlService cfgControlService
 
 	treeService *tree.Service
diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go
index d173ced8e..2e6d8b325 100644
--- a/cmd/neofs-node/control.go
+++ b/cmd/neofs-node/control.go
@@ -30,6 +30,8 @@ func initControlService(c *cfg) {
 		controlSvc.WithAuthorizedKeys(rawPubs),
 		controlSvc.WithHealthChecker(c),
 		controlSvc.WithNetMapSource(c.netMapSource),
+		controlSvc.WithContainerSource(c.cfgObject.cnrSource),
+		controlSvc.WithReplicator(c.replicator),
 		controlSvc.WithNodeState(c),
 		controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
 		controlSvc.WithTreeService(c.treeService),
diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go
index d205e54a3..278c157e0 100644
--- a/cmd/neofs-node/object.go
+++ b/cmd/neofs-node/object.go
@@ -207,7 +207,7 @@ func initObjectService(c *cfg) {
 		log:     c.log,
 	}
 
-	repl := replicator.New(
+	c.replicator = replicator.New(
 		replicator.WithLogger(c.log),
 		replicator.WithPutTimeout(
 			replicatorconfig.PutTimeout(c.appCfg),
@@ -232,7 +232,7 @@ func initObjectService(c *cfg) {
 		policer.WithHeadTimeout(
 			policerconfig.HeadTimeout(c.appCfg),
 		),
-		policer.WithReplicator(repl),
+		policer.WithReplicator(c.replicator),
 		policer.WithRedundantCopyCallback(func(addr oid.Address) {
 			var inhumePrm engine.InhumePrm
 			inhumePrm.MarkAsGarbage(addr)
diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go
index 8208a00ab..d232129aa 100644
--- a/pkg/services/control/server/evacuate.go
+++ b/pkg/services/control/server/evacuate.go
@@ -1,11 +1,20 @@
 package control
 
 import (
+	"bytes"
 	"context"
+	"crypto/sha256"
+	"errors"
+	"fmt"
 
+	"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
 	"github.com/nspcc-dev/neofs-node/pkg/services/control"
+	"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
+	"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
+	objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
+	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -21,6 +30,7 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
 	var prm engine.EvacuateShardPrm
 	prm.WithShardID(shardID)
 	prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
+	prm.WithFaultHandler(s.replicate)
 
 	res, err := s.s.Evacuate(prm)
 	if err != nil {
@@ -39,3 +49,61 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ
 	}
 	return resp, nil
 }
+
+func (s *Server) replicate(addr oid.Address, obj *objectSDK.Object) error {
+	cid, ok := obj.ContainerID()
+	if !ok {
+		// Return nil to prevent situations where a shard can't be evacuated
+		// because of a single bad/corrupted object.
+		return nil
+	}
+
+	nm, err := s.netMapSrc.GetNetMap(0)
+	if err != nil {
+		return err
+	}
+
+	c, err := s.cnrSrc.Get(cid)
+	if err != nil {
+		return err
+	}
+
+	binCnr := make([]byte, sha256.Size)
+	cid.Encode(binCnr)
+
+	ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
+	if err != nil {
+		return fmt.Errorf("can't build a list of container nodes")
+	}
+
+	nodes := placement.FlattenNodes(ns)
+	bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
+	for i := 0; i < len(nodes); i++ {
+		if bytes.Equal(nodes[i].PublicKey(), bs) {
+			copy(nodes[i:], nodes[i+1:])
+			nodes = nodes[:len(nodes)-1]
+		}
+	}
+
+	var res replicatorResult
+	task := new(replicator.Task).
+		WithObject(obj).
+		WithObjectAddress(addr).
+		WithCopiesNumber(1).
+		WithNodes(nodes)
+	s.replicator.HandleTask(context.TODO(), task, &res)
+
+	if res.count == 0 {
+		return errors.New("object was not replicated")
+	}
+	return nil
+}
+
+type replicatorResult struct {
+	count int
+}
+
+// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
+func (r *replicatorResult) SubmitSuccessfulReplication(_ uint64) {
+	r.count++
+}
diff --git a/pkg/services/control/server/server.go b/pkg/services/control/server/server.go
index 682900ad5..920e9bfd3 100644
--- a/pkg/services/control/server/server.go
+++ b/pkg/services/control/server/server.go
@@ -3,9 +3,11 @@ package control
 import (
 	"crypto/ecdsa"
 
+	"github.com/nspcc-dev/neofs-node/pkg/core/container"
 	"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
 	"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
 	"github.com/nspcc-dev/neofs-node/pkg/services/control"
+	"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
 )
 
 // Server is an entity that serves
@@ -47,6 +49,10 @@ type cfg struct {
 
 	netMapSrc netmap.Source
 
+	cnrSrc container.Source
+
+	replicator *replicator.Replicator
+
 	nodeState NodeState
 
 	treeService TreeService
@@ -102,6 +108,20 @@ func WithNetMapSource(netMapSrc netmap.Source) Option {
 	}
 }
 
+// WithContainerSource returns option to set container storage.
+func WithContainerSource(cnrSrc container.Source) Option {
+	return func(c *cfg) {
+		c.cnrSrc = cnrSrc
+	}
+}
+
+// WithReplicator returns option to set network map storage.
+func WithReplicator(r *replicator.Replicator) Option {
+	return func(c *cfg) {
+		c.replicator = r
+	}
+}
+
 // WithNodeState returns option to set node network state component.
 func WithNodeState(state NodeState) Option {
 	return func(c *cfg) {
diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go
index 91d8fd3d0..8b4554cc9 100644
--- a/pkg/services/replicator/process.go
+++ b/pkg/services/replicator/process.go
@@ -26,17 +26,20 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task, res TaskResult)
 		)
 	}()
 
-	obj, err := engine.Get(p.localStorage, task.addr)
-	if err != nil {
-		p.log.Error("could not get object from local storage",
-			zap.Stringer("object", task.addr),
-			zap.Error(err))
+	if task.obj == nil {
+		var err error
+		task.obj, err = engine.Get(p.localStorage, task.addr)
+		if err != nil {
+			p.log.Error("could not get object from local storage",
+				zap.Stringer("object", task.addr),
+				zap.Error(err))
 
-		return
+			return
+		}
 	}
 
 	prm := new(putsvc.RemotePutPrm).
-		WithObject(obj)
+		WithObject(task.obj)
 
 	for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
 		select {
@@ -52,7 +55,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task, res TaskResult)
 
 		callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
 
-		err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
+		err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
 
 		cancel()
 
diff --git a/pkg/services/replicator/task.go b/pkg/services/replicator/task.go
index 99d1c5139..3d9da9b6b 100644
--- a/pkg/services/replicator/task.go
+++ b/pkg/services/replicator/task.go
@@ -2,6 +2,7 @@ package replicator
 
 import (
 	"github.com/nspcc-dev/neofs-sdk-go/netmap"
+	objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
 	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
 )
 
@@ -11,6 +12,8 @@ type Task struct {
 
 	addr oid.Address
 
+	obj *objectSDK.Object
+
 	nodes []netmap.NodeInfo
 }
 
@@ -32,6 +35,15 @@ func (t *Task) WithObjectAddress(v oid.Address) *Task {
 	return t
 }
 
+// WithObject sets object to avoid fetching it from the local storage.
+func (t *Task) WithObject(obj *objectSDK.Object) *Task {
+	if t != nil {
+		t.obj = obj
+	}
+
+	return t
+}
+
 // WithNodes sets a list of potential object holders.
 func (t *Task) WithNodes(v []netmap.NodeInfo) *Task {
 	if t != nil {