package control import ( "bytes" "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes") func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) } if req.GetBody().GetScope() == uint32(control.StartShardEvacuationRequest_Body_NONE) { return nil, status.Error(codes.InvalidArgument, "no evacuation scope") } prm := engine.EvacuateShardPrm{ ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), IgnoreErrors: req.GetBody().GetIgnoreErrors(), ObjectsHandler: s.replicateObject, TreeHandler: s.replicateTree, Scope: engine.EvacuateScope(req.GetBody().GetScope()), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), RepOneOnly: req.GetBody().GetRepOneOnly(), } if err = s.s.Evacuate(ctx, prm); err != nil { var logicalErr logicerr.Logical if errors.As(err, &logicalErr) { return nil, status.Error(codes.Aborted, err.Error()) } return nil, status.Error(codes.Internal, err.Error()) } resp := &control.StartShardEvacuationResponse{ Body: &control.StartShardEvacuationResponse_Body{}, } err = ctrlmessage.Sign(s.key, resp) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } return resp, nil } func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) } state, err := s.s.GetEvacuationState(ctx) if err != nil { var logicalErr logicerr.Logical if errors.As(err, &logicalErr) { return nil, status.Error(codes.Aborted, err.Error()) } return nil, status.Error(codes.Internal, err.Error()) } resp, err := stateToResponse(state) if err != nil { return nil, err } err = ctrlmessage.Sign(s.key, resp) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } return resp, nil } func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) } err = s.s.EnqueRunningEvacuationStop(ctx) if err != nil { var logicalErr logicerr.Logical if errors.As(err, &logicalErr) { return nil, status.Error(codes.Aborted, err.Error()) } return nil, status.Error(codes.Internal, err.Error()) } resp := &control.StopShardEvacuationResponse{ Body: &control.StopShardEvacuationResponse_Body{}, } err = ctrlmessage.Sign(s.key, resp) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } s.s.ResetEvacuationStatusForShards() return resp, nil } func (s *Server) ResetShardEvacuationStatus(ctx context.Context, req *control.ResetShardEvacuationStatusRequest) (*control.ResetShardEvacuationStatusResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) } err = s.s.ResetEvacuationStatus(ctx) if err != nil { var logicalErr logicerr.Logical if errors.As(err, &logicalErr) { return nil, status.Error(codes.Aborted, err.Error()) } return nil, status.Error(codes.Internal, err.Error()) } resp := &control.ResetShardEvacuationStatusResponse{ Body: &control.ResetShardEvacuationStatusResponse_Body{}, } err = ctrlmessage.Sign(s.key, resp) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } return resp, nil } func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { cid, ok := obj.ContainerID() if !ok { // Return nil to prevent situations where a shard can't be evacuated // because of a single bad/corrupted object. return false, nil } nodes, err := s.getContainerNodes(cid) if err != nil { return false, err } if len(nodes) == 0 { return false, nil } var res replicatorResult task := replicator.Task{ NumCopies: 1, Addr: addr, Obj: obj, Nodes: nodes, } s.replicator.HandleReplicationTask(ctx, task, &res) if res.count == 0 { return false, errors.New("object was not replicated") } return true, nil } func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) { nodes, err := s.getContainerNodes(contID) if err != nil { return false, "", err } if len(nodes) == 0 { return false, "", nil } for _, node := range nodes { err = s.replicateTreeToNode(ctx, forest, contID, treeID, node) if err == nil { return true, hex.EncodeToString(node.PublicKey()), nil } } return false, "", err } func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error { rawCID := make([]byte, sha256.Size) contID.Encode(rawCID) var height uint64 for { op, err := forest.TreeGetOpLog(ctx, contID, treeID, height) if err != nil { return err } if op.Time == 0 { return nil } req := &tree.ApplyRequest{ Body: &tree.ApplyRequest_Body{ ContainerId: rawCID, TreeId: treeID, Operation: &tree.LogMove{ ParentId: op.Parent, Meta: op.Meta.Bytes(), ChildId: op.Child, }, }, } err = tree.SignMessage(req, s.key) if err != nil { return fmt.Errorf("can't message apply request: %w", err) } err = s.treeService.ReplicateTreeOp(ctx, node, req) if err != nil { return err } height = op.Time + 1 } } func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) { nm, err := s.netMapSrc.GetNetMap(0) if err != nil { return nil, err } c, err := s.cnrSrc.Get(contID) if err != nil { return nil, err } binCnr := make([]byte, sha256.Size) contID.Encode(binCnr) ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr) if err != nil { return nil, errFailedToBuildListOfContainerNodes } nodes := placement.FlattenNodes(ns) bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes() for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body if bytes.Equal(nodes[i].PublicKey(), bs) { copy(nodes[i:], nodes[i+1:]) nodes = nodes[:len(nodes)-1] } } return nodes, nil } type replicatorResult struct { count int } // SubmitSuccessfulReplication implements the replicator.TaskResult interface. func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) { r.count++ }