From ac0511d21423df1d191539f8272f962c23dfb62b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 10 Dec 2024 16:52:46 +0300 Subject: [PATCH 1/2] [#1549] controlSvc: Drop deprecated EvacuateShard rpc Signed-off-by: Dmitrii Stepanov --- .../modules/control/evacuate_shard.go | 56 ------ cmd/frostfs-cli/modules/control/evacuation.go | 9 +- cmd/frostfs-cli/modules/control/shards.go | 2 - pkg/services/control/rpc.go | 14 -- pkg/services/control/server/evacuate.go | 188 ------------------ pkg/services/control/server/evacuate_async.go | 148 +++++++++++++- pkg/services/control/service.proto | 5 - pkg/services/control/service_grpc.pb.go | 43 ---- 8 files changed, 151 insertions(+), 314 deletions(-) delete mode 100644 cmd/frostfs-cli/modules/control/evacuate_shard.go delete mode 100644 pkg/services/control/server/evacuate.go diff --git a/cmd/frostfs-cli/modules/control/evacuate_shard.go b/cmd/frostfs-cli/modules/control/evacuate_shard.go deleted file mode 100644 index 1e48c1df4..000000000 --- a/cmd/frostfs-cli/modules/control/evacuate_shard.go +++ /dev/null @@ -1,56 +0,0 @@ -package control - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" - commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" - "github.com/spf13/cobra" -) - -const ignoreErrorsFlag = "no-errors" - -var evacuateShardCmd = &cobra.Command{ - Use: "evacuate", - Short: "Evacuate objects from shard", - Long: "Evacuate objects from shard to other shards", - Run: evacuateShard, - Deprecated: "use frostfs-cli control shards evacuation start", -} - -func evacuateShard(cmd *cobra.Command, _ []string) { - pk := key.Get(cmd) - - req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} - req.Body.Shard_ID = getShardIDList(cmd) - req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag) - - signRequest(cmd, pk, req) - - cli := getClient(cmd, pk) - - var resp *control.EvacuateShardResponse - var err error - err = cli.ExecRaw(func(client *client.Client) error { - resp, err = control.EvacuateShard(client, req) - return err - }) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) - - cmd.Printf("Objects moved: %d\n", resp.GetBody().GetCount()) - - verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) - - cmd.Println("Shard has successfully been evacuated.") -} - -func initControlEvacuateShardCmd() { - initControlFlags(evacuateShardCmd) - - flags := evacuateShardCmd.Flags() - flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") - flags.Bool(shardAllFlag, false, "Process all shards") - flags.Bool(ignoreErrorsFlag, false, "Skip invalid/unreadable objects") - - evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) -} diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 73700e56d..8032bf09a 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -17,10 +17,11 @@ import ( ) const ( - awaitFlag = "await" - noProgressFlag = "no-progress" - scopeFlag = "scope" - repOneOnlyFlag = "rep-one-only" + awaitFlag = "await" + noProgressFlag = "no-progress" + scopeFlag = "scope" + repOneOnlyFlag = "rep-one-only" + ignoreErrorsFlag = "no-errors" containerWorkerCountFlag = "container-worker-count" objectWorkerCountFlag = "object-worker-count" diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 329cb9100..3483f5d62 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -13,7 +13,6 @@ var shardsCmd = &cobra.Command{ func initControlShardsCmd() { shardsCmd.AddCommand(listShardsCmd) shardsCmd.AddCommand(setShardModeCmd) - shardsCmd.AddCommand(evacuateShardCmd) shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(doctorCmd) @@ -23,7 +22,6 @@ func initControlShardsCmd() { initControlShardsListCmd() initControlSetShardModeCmd() - initControlEvacuateShardCmd() initControlEvacuationShardCmd() initControlFlushCacheCmd() initControlDoctorCmd() diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 514061db4..6982d780d 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -15,7 +15,6 @@ const ( rpcListShards = "ListShards" rpcSetShardMode = "SetShardMode" rpcSynchronizeTree = "SynchronizeTree" - rpcEvacuateShard = "EvacuateShard" rpcStartShardEvacuation = "StartShardEvacuation" rpcGetShardEvacuationStatus = "GetShardEvacuationStatus" rpcResetShardEvacuationStatus = "ResetShardEvacuationStatus" @@ -162,19 +161,6 @@ func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...cl return wResp.message, nil } -// EvacuateShard executes ControlService.EvacuateShard RPC. -func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client.CallOption) (*EvacuateShardResponse, error) { - wResp := newResponseWrapper[EvacuateShardResponse]() - wReq := &requestWrapper{m: req} - - err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcEvacuateShard), wReq, wResp, opts...) - if err != nil { - return nil, err - } - - return wResp.message, nil -} - // StartShardEvacuation executes ControlService.StartShardEvacuation RPC. func StartShardEvacuation(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) { wResp := newResponseWrapper[StartShardEvacuationResponse]() diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go deleted file mode 100644 index ae3413373..000000000 --- a/pkg/services/control/server/evacuate.go +++ /dev/null @@ -1,188 +0,0 @@ -package control - -import ( - "bytes" - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes") - -func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) { - err := s.isValidRequest(req) - if err != nil { - return nil, status.Error(codes.PermissionDenied, err.Error()) - } - - prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicateObject, - Scope: engine.EvacuateScopeObjects, - } - - res, err := s.s.Evacuate(ctx, prm) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resp := &control.EvacuateShardResponse{ - Body: &control.EvacuateShardResponse_Body{ - Count: uint32(res.ObjectsEvacuated()), - }, - } - - err = ctrlmessage.Sign(s.key, resp) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - return resp, nil -} - -func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { - cid, ok := obj.ContainerID() - if !ok { - // Return nil to prevent situations where a shard can't be evacuated - // because of a single bad/corrupted object. - return false, nil - } - - nodes, err := s.getContainerNodes(cid) - if err != nil { - return false, err - } - - if len(nodes) == 0 { - return false, nil - } - - var res replicatorResult - task := replicator.Task{ - NumCopies: 1, - Addr: addr, - Obj: obj, - Nodes: nodes, - } - s.replicator.HandleReplicationTask(ctx, task, &res) - - if res.count == 0 { - return false, errors.New("object was not replicated") - } - return true, nil -} - -func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) { - nodes, err := s.getContainerNodes(contID) - if err != nil { - return false, "", err - } - if len(nodes) == 0 { - return false, "", nil - } - - for _, node := range nodes { - err = s.replicateTreeToNode(ctx, forest, contID, treeID, node) - if err == nil { - return true, hex.EncodeToString(node.PublicKey()), nil - } - } - return false, "", err -} - -func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error { - rawCID := make([]byte, sha256.Size) - contID.Encode(rawCID) - - var height uint64 - for { - op, err := forest.TreeGetOpLog(ctx, contID, treeID, height) - if err != nil { - return err - } - - if op.Time == 0 { - return nil - } - - req := &tree.ApplyRequest{ - Body: &tree.ApplyRequest_Body{ - ContainerId: rawCID, - TreeId: treeID, - Operation: &tree.LogMove{ - ParentId: op.Parent, - Meta: op.Meta.Bytes(), - ChildId: op.Child, - }, - }, - } - - err = tree.SignMessage(req, s.key) - if err != nil { - return fmt.Errorf("can't message apply request: %w", err) - } - - err = s.treeService.ReplicateTreeOp(ctx, node, req) - if err != nil { - return err - } - - height = op.Time + 1 - } -} - -func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) { - nm, err := s.netMapSrc.GetNetMap(0) - if err != nil { - return nil, err - } - - c, err := s.cnrSrc.Get(contID) - if err != nil { - return nil, err - } - - binCnr := make([]byte, sha256.Size) - contID.Encode(binCnr) - - ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr) - if err != nil { - return nil, errFailedToBuildListOfContainerNodes - } - - nodes := placement.FlattenNodes(ns) - bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes() - for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body - if bytes.Equal(nodes[i].PublicKey(), bs) { - copy(nodes[i:], nodes[i+1:]) - nodes = nodes[:len(nodes)-1] - } - } - return nodes, nil -} - -type replicatorResult struct { - count int -} - -// SubmitSuccessfulReplication implements the replicator.TaskResult interface. -func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) { - r.count++ -} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 146ac7e16..04465eb26 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -1,17 +1,32 @@ package control import ( + "bytes" "context" + "crypto/sha256" + "encoding/hex" "errors" + "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +var errFailedToBuildListOfContainerNodes = errors.New("can't build a list of container nodes") + func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) { err := s.isValidRequest(req) if err != nil { @@ -34,8 +49,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha RepOneOnly: req.GetBody().GetRepOneOnly(), } - _, err = s.s.Evacuate(ctx, prm) - if err != nil { + if err = s.s.Evacuate(ctx, prm); err != nil { var logicalErr logicerr.Logical if errors.As(err, &logicalErr) { return nil, status.Error(codes.Aborted, err.Error()) @@ -135,3 +149,133 @@ func (s *Server) ResetShardEvacuationStatus(ctx context.Context, req *control.Re } return resp, nil } + +func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { + cid, ok := obj.ContainerID() + if !ok { + // Return nil to prevent situations where a shard can't be evacuated + // because of a single bad/corrupted object. + return false, nil + } + + nodes, err := s.getContainerNodes(cid) + if err != nil { + return false, err + } + + if len(nodes) == 0 { + return false, nil + } + + var res replicatorResult + task := replicator.Task{ + NumCopies: 1, + Addr: addr, + Obj: obj, + Nodes: nodes, + } + s.replicator.HandleReplicationTask(ctx, task, &res) + + if res.count == 0 { + return false, errors.New("object was not replicated") + } + return true, nil +} + +func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) { + nodes, err := s.getContainerNodes(contID) + if err != nil { + return false, "", err + } + if len(nodes) == 0 { + return false, "", nil + } + + for _, node := range nodes { + err = s.replicateTreeToNode(ctx, forest, contID, treeID, node) + if err == nil { + return true, hex.EncodeToString(node.PublicKey()), nil + } + } + return false, "", err +} + +func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error { + rawCID := make([]byte, sha256.Size) + contID.Encode(rawCID) + + var height uint64 + for { + op, err := forest.TreeGetOpLog(ctx, contID, treeID, height) + if err != nil { + return err + } + + if op.Time == 0 { + return nil + } + + req := &tree.ApplyRequest{ + Body: &tree.ApplyRequest_Body{ + ContainerId: rawCID, + TreeId: treeID, + Operation: &tree.LogMove{ + ParentId: op.Parent, + Meta: op.Meta.Bytes(), + ChildId: op.Child, + }, + }, + } + + err = tree.SignMessage(req, s.key) + if err != nil { + return fmt.Errorf("can't message apply request: %w", err) + } + + err = s.treeService.ReplicateTreeOp(ctx, node, req) + if err != nil { + return err + } + + height = op.Time + 1 + } +} + +func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) { + nm, err := s.netMapSrc.GetNetMap(0) + if err != nil { + return nil, err + } + + c, err := s.cnrSrc.Get(contID) + if err != nil { + return nil, err + } + + binCnr := make([]byte, sha256.Size) + contID.Encode(binCnr) + + ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr) + if err != nil { + return nil, errFailedToBuildListOfContainerNodes + } + + nodes := placement.FlattenNodes(ns) + bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes() + for i := 0; i < len(nodes); i++ { // don't use range, slice mutates in body + if bytes.Equal(nodes[i].PublicKey(), bs) { + copy(nodes[i:], nodes[i+1:]) + nodes = nodes[:len(nodes)-1] + } + } + return nodes, nil +} + +type replicatorResult struct { + count int +} + +// SubmitSuccessfulReplication implements the replicator.TaskResult interface. +func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) { + r.count++ +} diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index ae1939e13..97ecf9a8c 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -30,11 +30,6 @@ service ControlService { // Synchronizes all log operations for the specified tree. rpc SynchronizeTree(SynchronizeTreeRequest) returns (SynchronizeTreeResponse); - // EvacuateShard moves all data from one shard to the others. - // Deprecated: Use - // StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation - rpc EvacuateShard(EvacuateShardRequest) returns (EvacuateShardResponse); - // StartShardEvacuation starts moving all data from one shard to the others. rpc StartShardEvacuation(StartShardEvacuationRequest) returns (StartShardEvacuationResponse); diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index f5cfefa85..987e08c59 100644 --- a/pkg/services/control/service_grpc.pb.go +++ b/pkg/services/control/service_grpc.pb.go @@ -26,7 +26,6 @@ const ( ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards" ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode" ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree" - ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard" ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation" ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus" ControlService_ResetShardEvacuationStatus_FullMethodName = "/control.ControlService/ResetShardEvacuationStatus" @@ -62,10 +61,6 @@ type ControlServiceClient interface { SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error) // Synchronizes all log operations for the specified tree. SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error) - // EvacuateShard moves all data from one shard to the others. - // Deprecated: Use - // StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation - EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) // StartShardEvacuation starts moving all data from one shard to the others. StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) // GetShardEvacuationStatus returns evacuation status. @@ -173,15 +168,6 @@ func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *Synchron return out, nil } -func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) { - out := new(EvacuateShardResponse) - err := c.cc.Invoke(ctx, ControlService_EvacuateShard_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) { out := new(StartShardEvacuationResponse) err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...) @@ -335,10 +321,6 @@ type ControlServiceServer interface { SetShardMode(context.Context, *SetShardModeRequest) (*SetShardModeResponse, error) // Synchronizes all log operations for the specified tree. SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) - // EvacuateShard moves all data from one shard to the others. - // Deprecated: Use - // StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation - EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) // StartShardEvacuation starts moving all data from one shard to the others. StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) // GetShardEvacuationStatus returns evacuation status. @@ -400,9 +382,6 @@ func (UnimplementedControlServiceServer) SetShardMode(context.Context, *SetShard func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SynchronizeTree not implemented") } -func (UnimplementedControlServiceServer) EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method EvacuateShard not implemented") -} func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented") } @@ -586,24 +565,6 @@ func _ControlService_SynchronizeTree_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } -func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(EvacuateShardRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ControlServiceServer).EvacuateShard(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: ControlService_EvacuateShard_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ControlServiceServer).EvacuateShard(ctx, req.(*EvacuateShardRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(StartShardEvacuationRequest) if err := dec(in); err != nil { @@ -909,10 +870,6 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{ MethodName: "SynchronizeTree", Handler: _ControlService_SynchronizeTree_Handler, }, - { - MethodName: "EvacuateShard", - Handler: _ControlService_EvacuateShard_Handler, - }, { MethodName: "StartShardEvacuation", Handler: _ControlService_StartShardEvacuation_Handler, -- 2.45.3 From 41da27dad51b2906b15157c0e3dc4e87c991da02 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 10 Dec 2024 16:53:19 +0300 Subject: [PATCH 2/2] [#1549] engine: Drop Async flag from evacuation parameters Now it is only async evacuation. Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/evacuate.go | 30 ++-- .../engine/evacuate_test.go | 130 +++++++++--------- pkg/services/control/server/evacuate_async.go | 1 - 3 files changed, 74 insertions(+), 87 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index b88c249b1..2e0344bfb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -86,7 +86,6 @@ type EvacuateShardPrm struct { ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error) IgnoreErrors bool - Async bool Scope EvacuateScope RepOneOnly bool @@ -211,10 +210,10 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) { +func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) error { select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() default: } @@ -226,7 +225,6 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), )) @@ -234,7 +232,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev shards, err := e.getActualShards(shardIDs, prm) if err != nil { - return nil, err + return err } shardsToEvacuate := make(map[string]*shard.Shard) @@ -247,10 +245,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev } res := NewEvacuateShardRes() - ctx = ctxOrBackground(ctx, prm.Async) - eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) + ctx = context.WithoutCancel(ctx) + eg, ctx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) if err != nil { - return nil, err + return err } var mtx sync.RWMutex @@ -262,21 +260,10 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return t } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) + return e.evacuateShards(ctx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) - if prm.Async { - return nil, nil - } - - return res, eg.Wait() -} - -func ctxOrBackground(ctx context.Context, background bool) context.Context { - if background { - return context.Background() - } - return ctx + return nil } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, @@ -286,7 +273,6 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), attribute.Bool("repOneOnly", prm.RepOneOnly), diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index beab8384e..248c39155 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -140,16 +140,17 @@ func TestEvacuateShardObjects(t *testing.T) { prm.Scope = EvacuateScopeObjects t.Run("must be read-only", func(t *testing.T) { - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, ErrMustBeReadOnly) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) }) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated()) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(objPerShard), st.ObjectsEvacuated()) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presense @@ -186,9 +187,10 @@ func TestEvacuateShardObjects(t *testing.T) { } // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. - res, err = e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st = testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(0), st.ObjectsEvacuated()) checkHasObjects(t) @@ -200,6 +202,17 @@ func TestEvacuateShardObjects(t *testing.T) { checkHasObjects(t) } +func testWaitForEvacuationCompleted(t *testing.T, e *StorageEngine) *EvacuationState { + var st *EvacuationState + var err error + require.Eventually(t, func() bool { + st, err = e.GetEvacuationState(context.Background()) + require.NoError(t, err) + return st.ProcessingStatus() == EvacuateProcessStateCompleted + }, 3*time.Second, 10*time.Millisecond) + return st +} + func TestEvacuateObjectsNetwork(t *testing.T) { t.Parallel() @@ -242,15 +255,15 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ShardID = ids[0:1] prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, uint64(0), res.ObjectsEvacuated()) prm.ObjectsHandler = acceptOneOf(objects, 2) - res, err = e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, uint64(2), st.ObjectsEvacuated()) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Parallel() @@ -267,16 +280,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, 2) prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, uint64(2), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, uint64(2), st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, 3) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, uint64(3), st.ObjectsEvacuated()) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -305,16 +320,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, totalCount-1) prm.Scope = EvacuateScopeObjects - res, err := e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), errReplication.Error()) + require.Equal(t, totalCount-1, st.ObjectsEvacuated()) t.Run("no errors", func(t *testing.T) { prm.ObjectsHandler = acceptOneOf(objects, totalCount) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, totalCount, res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, st.ErrorMessage(), "") + require.Equal(t, totalCount, st.ObjectsEvacuated()) }) }) } @@ -344,9 +361,8 @@ func TestEvacuateCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - res, err := e.Evacuate(ctx, prm) + err := e.Evacuate(ctx, prm) require.ErrorContains(t, err, "context canceled") - require.Equal(t, uint64(0), res.ObjectsEvacuated()) } func TestEvacuateCancellationByError(t *testing.T) { @@ -375,8 +391,9 @@ func TestEvacuateCancellationByError(t *testing.T) { prm.ObjectWorkerCount = 2 prm.ContainerWorkerCount = 2 - _, err := e.Evacuate(context.Background(), prm) - require.ErrorContains(t, err, "test error") + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Contains(t, st.ErrorMessage(), "test error") } func TestEvacuateSingleProcess(t *testing.T) { @@ -406,20 +423,19 @@ func TestEvacuateSingleProcess(t *testing.T) { eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { - res, err := e.Evacuate(egCtx, prm) - require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") return nil }) eg.Go(func() error { <-running - res, err := e.Evacuate(egCtx, prm) - require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed") - require.Equal(t, uint64(0), res.ObjectsEvacuated()) + require.ErrorContains(t, e.Evacuate(egCtx, prm), "evacuate is already running for shard ids", "second evacuation not failed") close(blocker) return nil }) require.NoError(t, eg.Wait()) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, uint64(3), st.ObjectsEvacuated()) + require.Equal(t, st.ErrorMessage(), "") } func TestEvacuateObjectsAsync(t *testing.T) { @@ -458,9 +474,9 @@ func TestEvacuateObjectsAsync(t *testing.T) { eg, egCtx := errgroup.WithContext(context.Background()) eg.Go(func() error { - res, err := e.Evacuate(egCtx, prm) - require.NoError(t, err, "first evacuation failed") - require.Equal(t, uint64(3), res.ObjectsEvacuated()) + require.NoError(t, e.Evacuate(egCtx, prm), "first evacuation failed") + st = testWaitForEvacuationCompleted(t, e) + require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") return nil }) @@ -483,12 +499,7 @@ func TestEvacuateObjectsAsync(t *testing.T) { close(blocker) - require.Eventually(t, func() bool { - st, err = e.GetEvacuationState(context.Background()) - return st.ProcessingStatus() == EvacuateProcessStateCompleted - }, 3*time.Second, 10*time.Millisecond, "invalid final state") - - require.NoError(t, err, "get final state failed") + st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count") require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.FinishedAt(), "invalid final finished at") @@ -534,14 +545,9 @@ func TestEvacuateTreesLocal(t *testing.T) { require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") - res, err := e.Evacuate(context.Background(), prm) - require.NotNil(t, res, "sync evacuation result must be not nil") - require.NoError(t, err, "evacuation failed") - - st, err = e.GetEvacuationState(context.Background()) - require.NoError(t, err, "get evacuation state failed") - require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) + require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") + st = testWaitForEvacuationCompleted(t, e) require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") @@ -632,15 +638,9 @@ func TestEvacuateTreesRemote(t *testing.T) { require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.Equal(t, "", st.ErrorMessage(), "invalid init error message") - res, err := e.Evacuate(context.Background(), prm) - require.NotNil(t, res, "sync evacuation must return not nil") - require.NoError(t, err, "evacuation failed") + require.NoError(t, e.Evacuate(context.Background(), prm), "evacuation failed") + st = testWaitForEvacuationCompleted(t, e) - st, err = e.GetEvacuationState(context.Background()) - require.NoError(t, err, "get evacuation state failed") - require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) - - require.NoError(t, err, "get final state failed") require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") @@ -754,11 +754,12 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) - res, err := e.Evacuate(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, uint64(4), res.ObjectsEvacuated()) - require.Equal(t, uint64(8), res.ObjectsSkipped()) - require.Equal(t, uint64(0), res.ObjectsFailed()) + require.NoError(t, e.Evacuate(context.Background(), prm)) + st := testWaitForEvacuationCompleted(t, e) + require.Equal(t, "", st.ErrorMessage()) + require.Equal(t, uint64(4), st.ObjectsEvacuated()) + require.Equal(t, uint64(8), st.ObjectsSkipped()) + require.Equal(t, uint64(0), st.ObjectsFailed()) } func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { @@ -812,7 +813,8 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) start := time.Now() - _, err := e.Evacuate(context.Background(), prm) + err := e.Evacuate(context.Background(), prm) + testWaitForEvacuationCompleted(t, e) t.Logf("evacuate took %v\n", time.Since(start)) require.NoError(t, err) } diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 04465eb26..da5401515 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -42,7 +42,6 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha IgnoreErrors: req.GetBody().GetIgnoreErrors(), ObjectsHandler: s.replicateObject, TreeHandler: s.replicateTree, - Async: true, Scope: engine.EvacuateScope(req.GetBody().GetScope()), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), -- 2.45.3