From 0352b5b191ad110070043ada4f8d1f0a998bb74c Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Thu, 7 Nov 2024 14:29:55 +0300 Subject: [PATCH] [#185] Implement rpc/client for tree service Signed-off-by: Nikita Zinkevich --- api/rpc/tree.go | 178 ++++ api/tree/convert.go | 1591 +++++++++++++++++++++++++++++++++++ api/tree/types.go | 953 +++++++++++++++++++++ pool/tree/client.go | 81 +- pool/tree/pool.go | 218 ++--- pool/tree/pool_signature.go | 6 +- pool/tree/pool_test.go | 12 +- 7 files changed, 2875 insertions(+), 164 deletions(-) create mode 100644 api/rpc/tree.go create mode 100644 api/tree/convert.go create mode 100644 api/tree/types.go diff --git a/api/rpc/tree.go b/api/rpc/tree.go new file mode 100644 index 0000000..8f5bf0a --- /dev/null +++ b/api/rpc/tree.go @@ -0,0 +1,178 @@ +package rpc + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" +) + +const serviceTree = "tree.TreeService" + +const ( + rpcTreeAdd = "Add" + rpcTreeAddByPath = "AddByPath" + rpcTreeRemove = "Remove" + rpcTreeMove = "Move" + rpcTreeGetNodeByPath = "GetNodeByPath" + rpcTreeGetSubTree = "GetSubTree" + rpcTreeList = "TreeList" + rpcTreeApply = "Apply" + rpcTreeGetOpLog = "GetOpLog" + rpcTreeHealthcheck = "Healthcheck" +) + +func Add( + cli *client.Client, + req *tree.AddRequest, + opts ...client.CallOption, +) (*tree.AddResponse, error) { + resp := new(tree.AddResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +func AddByPath( + cli *client.Client, + req *tree.AddByPathRequest, + opts ...client.CallOption, +) (*tree.AddByPathResponse, error) { + resp := new(tree.AddByPathResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +func Remove(cli *client.Client, + req *tree.RemoveRequest, + opts ...client.CallOption, +) (*tree.RemoveResponse, error) { + resp := new(tree.RemoveResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +func Move(cli *client.Client, + req *tree.MoveRequest, + opts ...client.CallOption, +) (*tree.MoveResponse, error) { + resp := new(tree.MoveResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +func GetNodeByPath(cli *client.Client, + req *tree.GetNodeByPathRequest, + opts ...client.CallOption, +) (*tree.GetNodeByPathResponse, error) { + resp := new(tree.GetNodeByPathResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +type GetSubTreeResponseReader struct { + r client.MessageReader +} + +// Read reads response from the stream. +// +// Returns io.EOF of streaming is finished. +func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error { + return r.r.ReadMessage(resp) +} + +func GetSubTree(cli *client.Client, + req *tree.GetSubTreeRequest, + opts ...client.CallOption, +) (*GetSubTreeResponseReader, error) { + wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...) + if err != nil { + return nil, err + } + + return &GetSubTreeResponseReader{ + r: wc, + }, nil +} + +func TreeList(cli *client.Client, + req *tree.ListRequest, + opts ...client.CallOption, +) (*tree.ListResponse, error) { + resp := new(tree.ListResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +func Apply(cli *client.Client, + req *tree.ApplyRequest, + opts ...client.CallOption, +) (*tree.ApplyResponse, error) { + resp := new(tree.ApplyResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} + +type TreeServiceGetOpLogResponseReader struct { + r client.MessageReader +} + +// Read reads response from the stream. +// +// Returns io.EOF of streaming is finished. +func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error { + return r.r.ReadMessage(resp) +} + +func GetOpLog(cli *client.Client, + req *tree.GetOpLogRequest, + opts ...client.CallOption, +) (*TreeServiceGetOpLogResponseReader, error) { + wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...) + if err != nil { + return nil, err + } + + return &TreeServiceGetOpLogResponseReader{ + r: wc, + }, nil +} + +func Healthcheck(cli *client.Client, + req *tree.HealthcheckRequest, + opts ...client.CallOption, +) (*tree.HealthcheckResponse, error) { + resp := new(tree.HealthcheckResponse) + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/api/tree/convert.go b/api/tree/convert.go new file mode 100644 index 0000000..1611504 --- /dev/null +++ b/api/tree/convert.go @@ -0,0 +1,1591 @@ +package tree + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message" + tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" +) + +func metaToGRPC(m []*KeyValue) (res []*tree.KeyValue) { + if m != nil { + res = make([]*tree.KeyValue, 0, len(m)) + + for i := range m { + res = append(res, m[i].ToGRPCMessage().(*tree.KeyValue)) + } + } + + return +} + +func metaFromGRPC(m []*tree.KeyValue) (res []*KeyValue, err error) { + if m != nil { + res = make([]*KeyValue, len(m)) + + for i := range m { + res[i] = new(KeyValue) + err = res[i].FromGRPCMessage(m[i]) + if err != nil { + return + } + } + } + + return +} + +func (r *AddResponse) ToGRPCMessage() grpc.Message { + var m *tree.AddResponse + + if r != nil { + m = new(tree.AddResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.AddResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *AddResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(AddResponseBody) + } + + err = r.body.FromGRPCMessage(body) + } + + return err +} + +func (r *AddResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.AddResponse_Body + + if r != nil { + m = new(tree.AddResponse_Body) + m.NodeId = r.nodeID + } + + return m +} + +func (r *AddResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + r.nodeID = v.GetNodeId() + + return nil +} + +func (r *AddByPathRequest) ToGRPCMessage() grpc.Message { + var m *tree.AddByPathRequest + + if r != nil { + m = new(tree.AddByPathRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *AddByPathRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddByPathRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(AddByPathRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *AddByPathRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.AddByPathRequest_Body + + if r != nil { + m = new(tree.AddByPathRequest_Body) + m.ContainerId = r.containerID + m.Path = r.path + m.BearerToken = r.bearerToken + m.PathAttribute = r.pathAttribute + m.TreeId = r.treeID + m.Meta = metaToGRPC(r.meta) + } + + return m +} + +func (r *AddByPathRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddByPathRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + meta := v.GetMeta() + if meta == nil { + r.meta = nil + } else { + r.meta, err = metaFromGRPC(meta) + if err != nil { + return err + } + } + + r.containerID = v.GetContainerId() + r.bearerToken = v.GetBearerToken() + r.path = v.GetPath() + r.pathAttribute = v.GetPathAttribute() + r.treeID = v.GetTreeId() + + return err +} + +func (r *MoveResponse) ToGRPCMessage() grpc.Message { + var m *tree.MoveResponse + + if r != nil { + m = new(tree.MoveResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.MoveResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *MoveResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.MoveResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(MoveResponseBody) + } + + err = r.body.FromGRPCMessage(body) + } + + return err +} + +func (r *MoveResponseBody) ToGRPCMessage() grpc.Message { + return new(tree.MoveResponse_Body) +} + +func (r *MoveResponseBody) FromGRPCMessage(grpc.Message) error { + return nil +} + +func (r *MoveRequest) ToGRPCMessage() grpc.Message { + var m *tree.MoveRequest + + if r != nil { + m = new(tree.MoveRequest) + m.Body = r.body.ToGRPCMessage().(*tree.MoveRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *MoveRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.MoveRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(MoveRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + + err = r.signature.FromGRPCMessage(sig) + if err != nil { + return err + } + } + + return nil +} + +func (r *MoveRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.MoveRequest_Body + + if r != nil { + m = new(tree.MoveRequest_Body) + + m.ContainerId = r.containerID + m.Meta = metaToGRPC(r.meta) + m.TreeId = r.treeID + m.ParentId = r.parentID + m.BearerToken = r.bearerToken + m.NodeId = r.nodeID + } + + return m +} + +func (r *MoveRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.MoveRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + r.meta, err = metaFromGRPC(v.GetMeta()) + if err != nil { + return err + } + r.treeID = v.GetTreeId() + r.nodeID = v.GetNodeId() + r.containerID = v.GetContainerId() + r.parentID = v.GetParentId() + r.bearerToken = v.GetBearerToken() + + return nil +} + +func (r *RemoveRequest) ToGRPCMessage() grpc.Message { + var m *tree.RemoveRequest + + if r != nil { + m = new(tree.RemoveRequest) + m.Body = r.body.ToGRPCMessage().(*tree.RemoveRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *RemoveRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.RemoveRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(RemoveRequestBody) + } + + err = r.body.FromGRPCMessage(body) + } + + return err +} + +func (r *RemoveRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.RemoveRequest_Body + + if r != nil { + m = new(tree.RemoveRequest_Body) + m.NodeId = r.nodeID + m.TreeId = r.treeID + m.BearerToken = r.bearerToken + m.ContainerId = r.containerID + } + + return m +} + +func (r *RemoveRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.RemoveRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + r.nodeID = v.GetNodeId() + r.treeID = v.GetTreeId() + r.containerID = v.GetContainerId() + r.bearerToken = v.GetBearerToken() + + return nil +} + +func (r *AddByPathResponse) ToGRPCMessage() grpc.Message { + var m *tree.AddByPathResponse + + if r != nil { + m = new(tree.AddByPathResponse) + m.Body = r.body.ToGRPCMessage().(*tree.AddByPathResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *AddByPathResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddByPathResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(AddByPathResponseBody) + } + + err = r.body.FromGRPCMessage(body) + } + + return err +} + +func (r *AddByPathResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.AddByPathResponse_Body + + if r != nil { + m = new(tree.AddByPathResponse_Body) + + m.ParentId = r.parentID + m.Nodes = r.nodes + } + + return m +} + +func (r *AddByPathResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddByPathResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + r.nodes = v.GetNodes() + r.parentID = v.GetParentId() + + return nil +} + +func (r *AddRequest) ToGRPCMessage() grpc.Message { + var m *tree.AddRequest + + if r != nil { + m = new(tree.AddRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.AddRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *AddRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(AddRequestBody) + } + + err = r.body.FromGRPCMessage(body) + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *AddRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.AddRequest_Body + + if r != nil { + m = new(tree.AddRequest_Body) + + m.Meta = metaToGRPC(r.meta) + m.TreeId = r.treeID + m.ParentId = r.parentID + m.ContainerId = r.containerID + m.BearerToken = r.bearerToken + } + + return m +} + +func (r *AddRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.AddRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + r.meta, err = metaFromGRPC(v.GetMeta()) + if err != nil { + return err + } + r.bearerToken = v.GetBearerToken() + r.treeID = v.GetTreeId() + r.containerID = v.GetContainerId() + r.parentID = v.GetParentId() + + return err +} + +func (r *RemoveResponse) ToGRPCMessage() grpc.Message { + var m *tree.RemoveResponse + + if r != nil { + m = new(tree.RemoveResponse) + m.Body = r.body.ToGRPCMessage().(*tree.RemoveResponse_Body) + } + + return m +} + +func (r *RemoveResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.RemoveResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(RemoveResponseBody) + } + + err = r.body.FromGRPCMessage(body) + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *RemoveResponseBody) ToGRPCMessage() grpc.Message { + return new(tree.RemoveResponse_Body) +} + +func (r *RemoveResponseBody) FromGRPCMessage(grpc.Message) error { + return nil +} + +func (r *GetNodeByPathRequest) ToGRPCMessage() grpc.Message { + var m *tree.GetNodeByPathRequest + + if r != nil { + m = new(tree.GetNodeByPathRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body) + m.Signature = r.signature.ToGRPCMessage().(*tree.Signature) + } + + return m +} + +func (r *GetNodeByPathRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetNodeByPathRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetNodeByPathRequestBody) + } + + err = r.body.FromGRPCMessage(body) + } + + return err +} + +func (r *GetNodeByPathRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.GetNodeByPathRequest_Body + + if r != nil { + m = new(tree.GetNodeByPathRequest_Body) + + m.TreeId = r.treeID + m.Path = r.path + m.BearerToken = r.bearerToken + m.ContainerId = r.containerID + m.Attributes = r.attributes + m.PathAttribute = r.pathAttribute + m.AllAttributes = r.allAttributes + m.LatestOnly = r.latestOnly + } + + return m +} + +func (r *GetNodeByPathRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetNodeByPathRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + r.treeID = v.GetTreeId() + r.path = v.GetPath() + r.attributes = v.GetAttributes() + r.latestOnly = v.GetLatestOnly() + r.allAttributes = v.GetAllAttributes() + r.pathAttribute = v.GetPathAttribute() + r.containerID = v.GetContainerId() + r.bearerToken = v.GetBearerToken() + + return nil +} + +func (r *GetNodeByPathResponse) ToGRPCMessage() grpc.Message { + var m *tree.GetNodeByPathResponse + + if r != nil { + m = new(tree.GetNodeByPathResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.GetNodeByPathResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *GetNodeByPathResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetNodeByPathResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetNodeByPathResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *GetNodeByPathResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.GetNodeByPathResponse_Body + + if r != nil { + m = new(tree.GetNodeByPathResponse_Body) + m.Nodes = GetNodeByPathInfoToGRPC(r.nodes) + } + + return m +} + +func (r *GetNodeByPathResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetNodeByPathResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + r.nodes, err = GetNodeByPathInfoFromGRPC(v.GetNodes()) + + return err +} + +func GetNodeByPathInfoToGRPC(m []*GetNodeByPathResponseInfo) (res []*tree.GetNodeByPathResponse_Info) { + if m != nil { + res = make([]*tree.GetNodeByPathResponse_Info, 0, len(m)) + + for i := range m { + res = append(res, m[i].ToGRPCMessage().(*tree.GetNodeByPathResponse_Info)) + } + } + + return +} + +func GetNodeByPathInfoFromGRPC(m []*tree.GetNodeByPathResponse_Info) (res []*GetNodeByPathResponseInfo, err error) { + if m != nil { + res = make([]*GetNodeByPathResponseInfo, len(m)) + + for i := range m { + res[i] = new(GetNodeByPathResponseInfo) + err = res[i].FromGRPCMessage(m[i]) + if err != nil { + return + } + } + } + + return +} + +func (r *GetNodeByPathResponseInfo) ToGRPCMessage() grpc.Message { + var m *tree.GetNodeByPathResponse_Info + + if r != nil { + m = new(tree.GetNodeByPathResponse_Info) + + m.Meta = metaToGRPC(r.meta) + m.NodeId = r.nodeID + m.Timestamp = r.timestamp + m.ParentId = r.parentID + } + + return m +} + +func (r *GetNodeByPathResponseInfo) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetNodeByPathResponse_Info) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + r.meta, err = metaFromGRPC(v.GetMeta()) + if err != nil { + return err + } + r.timestamp = v.GetTimestamp() + r.nodeID = v.GetNodeId() + r.parentID = v.GetParentId() + + return err +} + +func (r *ListRequest) ToGRPCMessage() grpc.Message { + var m *tree.TreeListRequest + + if r != nil { + m = new(tree.TreeListRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.TreeListRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *ListRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.TreeListRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ListRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *ListRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.TreeListRequest_Body + + if r != nil { + m = new(tree.TreeListRequest_Body) + + m.ContainerId = r.containerID + } + + return m +} + +func (r *ListRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.TreeListRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + r.containerID = v.GetContainerId() + + return nil +} + +func (r *ListResponse) ToGRPCMessage() grpc.Message { + var m *tree.TreeListResponse + + if r != nil { + m = new(tree.TreeListResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.TreeListResponse_Body) + m.Signature = r.signature.ToGRPCMessage().(*tree.Signature) + } + + return m +} + +func (r *ListResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.TreeListResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ListResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *ListResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.TreeListResponse_Body + + if r != nil { + m = new(tree.TreeListResponse_Body) + m.Ids = r.ids + } + + return m +} + +func (r *ListResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.TreeListResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + r.ids = v.GetIds() + + return nil +} + +func (r *ApplyRequest) ToGRPCMessage() grpc.Message { + var m *tree.ApplyRequest + + if r != nil { + m = new(tree.ApplyRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.ApplyRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *ApplyRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.ApplyRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ApplyRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *ApplyRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.ApplyRequest_Body + + if r != nil { + m = new(tree.ApplyRequest_Body) + + m.TreeId = r.treeID + m.ContainerId = r.containerID + m.Operation = r.operation.ToGRPCMessage().(*tree.LogMove) + } + + return m +} + +func (r *ApplyRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.ApplyRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + r.treeID = v.GetTreeId() + r.containerID = v.GetContainerId() + op := v.GetOperation() + if op == nil { + r.operation = nil + } else { + if r.operation == nil { + r.operation = new(LogMove) + } + err = r.operation.FromGRPCMessage(op) + } + + return err +} + +func (r *ApplyResponse) ToGRPCMessage() grpc.Message { + var m *tree.ApplyResponse + + if r != nil { + m = new(tree.ApplyResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.ApplyResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *ApplyResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.ApplyResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ApplyResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *ApplyResponseBody) ToGRPCMessage() grpc.Message { + return new(tree.ApplyResponse_Body) +} + +func (r *ApplyResponseBody) FromGRPCMessage(grpc.Message) error { + return nil +} + +func (r *HealthcheckRequest) ToGRPCMessage() grpc.Message { + var m *tree.HealthcheckRequest + + if r != nil { + m = new(tree.HealthcheckRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.HealthcheckRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *HealthcheckRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.HealthcheckRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(HealthcheckRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *HealthcheckRequestBody) ToGRPCMessage() grpc.Message { + return new(tree.HealthcheckRequest_Body) +} + +func (r *HealthcheckRequestBody) FromGRPCMessage(grpc.Message) error { + return nil +} + +func (r *HealthcheckResponse) ToGRPCMessage() grpc.Message { + var m *tree.HealthcheckResponse + + if r != nil { + m = new(tree.HealthcheckResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.HealthcheckResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *HealthcheckResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.HealthcheckResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(HealthcheckResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *HealthcheckResponseBody) ToGRPCMessage() grpc.Message { + return new(tree.HealthcheckResponse_Body) +} + +func (r *HealthcheckResponseBody) FromGRPCMessage(grpc.Message) error { + return nil +} + +func (r *GetSubTreeResponse) ToGRPCMessage() grpc.Message { + var m *tree.GetSubTreeResponse + + if r != nil { + m = new(tree.GetSubTreeResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.GetSubTreeResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *GetSubTreeResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetSubTreeResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetSubTreeResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *GetOpLogRequest) ToGRPCMessage() grpc.Message { + var m *tree.GetOpLogRequest + + if r != nil { + m = new(tree.GetOpLogRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.GetOpLogRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *GetOpLogRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetOpLogRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetOpLogRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *GetOpLogRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.GetOpLogRequest_Body + + if r != nil { + m = new(tree.GetOpLogRequest_Body) + + m.TreeId = r.treeID + m.Count = r.count + m.ContainerId = r.containerID + m.Height = r.height + } + + return m +} + +func (r *GetOpLogRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetOpLogRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + r.count = v.GetCount() + r.containerID = v.GetContainerId() + r.height = v.GetHeight() + r.treeID = v.GetTreeId() + + return err +} + +func (r *GetOpLogResponse) ToGRPCMessage() grpc.Message { + var m *tree.GetOpLogResponse + + if r != nil { + m = new(tree.GetOpLogResponse) + + m.Body = r.body.ToGRPCMessage().(*tree.GetOpLogResponse_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *GetOpLogResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetOpLogResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetOpLogResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *GetOpLogResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.GetOpLogResponse_Body + + if r != nil { + m = new(tree.GetOpLogResponse_Body) + + m.Operation = r.operation.ToGRPCMessage().(*tree.LogMove) + } + + return m +} + +func (r *GetOpLogResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetOpLogResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + oper := v.GetOperation() + if oper == nil { + r.operation = nil + } else { + if r.operation == nil { + r.operation = new(LogMove) + } + + err = r.operation.FromGRPCMessage(oper) + } + + return err +} + +func (s *Signature) ToGRPCMessage() grpc.Message { + var m *tree.Signature + + if s != nil { + m = new(tree.Signature) + + m.Sign = s.sign + m.Key = s.key + } + + return m +} + +func (s *Signature) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.Signature) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + s.sign = v.GetSign() + s.key = v.GetKey() + + return nil +} + +func (k *KeyValue) ToGRPCMessage() grpc.Message { + var m *tree.KeyValue + + if k != nil { + m = new(tree.KeyValue) + + m.Key = k.key + m.Value = k.value + } + + return m +} + +func (k *KeyValue) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.KeyValue) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + k.key = v.GetKey() + k.value = v.GetValue() + + return nil +} + +func (g *LogMove) ToGRPCMessage() grpc.Message { + var m *tree.LogMove + + if g != nil { + m = new(tree.LogMove) + + m.Meta = g.meta + m.ParentId = g.parentID + m.ChildId = g.childID + } + + return m +} + +func (g *LogMove) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.LogMove) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + g.meta = v.GetMeta() + g.parentID = v.GetParentId() + g.childID = v.GetChildId() + + return nil +} + +func (r *GetSubTreeRequest) ToGRPCMessage() grpc.Message { + var m *tree.GetSubTreeRequest + + if r != nil { + m = new(tree.GetSubTreeRequest) + + m.Body = r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body) + m.SetSignature(r.signature.ToGRPCMessage().(*tree.Signature)) + } + + return m +} + +func (r *GetSubTreeRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetSubTreeRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(GetSubTreeRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + sig := v.GetSignature() + if sig == nil { + r.signature = nil + } else { + if r.signature == nil { + r.signature = new(Signature) + } + err = r.signature.FromGRPCMessage(sig) + } + + return err +} + +func (r *GetSubTreeRequestBody) ToGRPCMessage() grpc.Message { + var m *tree.GetSubTreeRequest_Body + + if r != nil { + m = new(tree.GetSubTreeRequest_Body) + + m.TreeId = r.treeID + m.BearerToken = r.bearerToken + m.Depth = r.depth + m.RootId = r.rootID + m.ContainerId = r.containerID + m.OrderBy = r.orderBy.ToGRPCMessage().(*tree.GetSubTreeRequest_Body_Order) + } + + return m +} + +func (r *GetSubTreeRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetSubTreeRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + r.rootID = v.GetRootId() + r.depth = v.GetDepth() + r.containerID = v.GetContainerId() + r.bearerToken = v.GetBearerToken() + r.treeID = v.GetTreeId() + + var err error + order := v.GetOrderBy() + if order == nil { + r.orderBy = nil + } else { + if r.orderBy == nil { + r.orderBy = new(GetSubTreeRequestBodyOrder) + } + + err = r.orderBy.FromGRPCMessage(order) + if err != nil { + return err + } + } + + return err +} +func (r *GetSubTreeRequestBodyOrder) ToGRPCMessage() grpc.Message { + var m *tree.GetSubTreeRequest_Body_Order + + if r != nil { + m = new(tree.GetSubTreeRequest_Body_Order) + + m.Direction = tree.GetSubTreeRequest_Body_Order_Direction(r.direction) + } + + return m +} + +func (r *GetSubTreeRequestBodyOrder) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetSubTreeRequest_Body_Order) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + r.direction = GetSubTreeRequestBodyOrderDirection(v.GetDirection()) + + return nil +} + +func (r *GetSubTreeResponseBody) ToGRPCMessage() grpc.Message { + var m *tree.GetSubTreeResponse_Body + + if r != nil { + m = new(tree.GetSubTreeResponse_Body) + + m.Meta = metaToGRPC(r.meta) + m.ParentId = r.parentID + m.NodeId = r.nodeID + m.Timestamp = r.timestamp + } + + return m +} + +func (r *GetSubTreeResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*tree.GetSubTreeResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + var err error + + r.nodeID = v.GetNodeId() + r.timestamp = v.GetTimestamp() + r.parentID = v.GetParentId() + r.meta, err = metaFromGRPC(v.GetMeta()) + + return err +} diff --git a/api/tree/types.go b/api/tree/types.go new file mode 100644 index 0000000..d20f946 --- /dev/null +++ b/api/tree/types.go @@ -0,0 +1,953 @@ +package tree + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session" + tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" +) + +type AddRequest struct { + body *AddRequestBody + signature *Signature +} + +func (r *AddRequest) GetBody() *AddRequestBody { + if r != nil { + return r.body + } + + return nil +} + +func (r *AddRequest) SetBody(v *AddRequestBody) { + r.body = v +} + +func (r *AddRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *AddRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *AddRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize() +} + +func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil +} + +type AddRequestBody struct { + containerID []byte + bearerToken []byte + treeID string + parentID uint64 + meta []*KeyValue +} + +func (r *AddRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *AddRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *AddRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *AddRequestBody) SetParentID(v uint64) { + r.parentID = v +} + +func (r *AddRequestBody) SetMeta(v []*KeyValue) { + r.meta = v +} + +type AddResponse struct { + body *AddResponseBody + signature *Signature +} + +func (r *AddResponse) GetBody() *AddResponseBody { + if r != nil { + return r.body + } + + return nil +} + +type AddResponseBody struct { + nodeID uint64 +} + +func (r *AddResponseBody) GetNodeID() uint64 { + if r != nil { + return r.nodeID + } + + return 0 +} + +type AddByPathRequest struct { + body *AddByPathRequestBody + signature *Signature +} + +func (r *AddByPathRequest) SetBody(v *AddByPathRequestBody) { + r.body = v +} + +func (r *AddByPathRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *AddByPathRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize() +} + +func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil +} + +type AddByPathRequestBody struct { + containerID []byte + treeID string + pathAttribute string + path []string + meta []*KeyValue + bearerToken []byte +} + +func (r *AddByPathRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *AddByPathRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *AddByPathRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *AddByPathRequestBody) SetPathAttribute(v string) { + r.pathAttribute = v +} + +func (r *AddByPathRequestBody) SetPath(v []string) { + r.path = v +} + +func (r *AddByPathRequestBody) SetMeta(v []*KeyValue) { + r.meta = v +} + +type AddByPathResponse struct { + body *AddByPathResponseBody + signature *Signature +} + +func (r *AddByPathResponse) GetBody() *AddByPathResponseBody { + if r != nil { + return r.body + } + + return nil +} + +func (r *AddByPathResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type AddByPathResponseBody struct { + nodes []uint64 + parentID uint64 +} + +func (r *AddByPathResponseBody) GetNodes() []uint64 { + if r != nil { + return r.nodes + } + + return nil +} + +func (r *AddByPathResponseBody) GetParentID() uint64 { + if r != nil { + return r.parentID + } + + return 0 +} + +type RemoveRequest struct { + body *RemoveRequestBody + signature *Signature +} + +func (r *RemoveRequest) SetBody(v *RemoveRequestBody) { + r.body = v +} + +func (r *RemoveRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *RemoveRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *RemoveRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize() +} + +func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil +} + +type RemoveRequestBody struct { + containerID []byte + treeID string + nodeID uint64 + bearerToken []byte +} + +func (r *RemoveRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *RemoveRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *RemoveRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *RemoveRequestBody) SetNodeID(v uint64) { + r.nodeID = v +} + +type RemoveResponse struct { + body *RemoveResponseBody + signature *Signature +} + +func (r *RemoveResponse) GetBody() *RemoveResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *RemoveResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + return nil +} + +type RemoveResponseBody struct{} + +type MoveRequest struct { + body *MoveRequestBody + signature *Signature +} + +func (r *MoveRequest) SetBody(v *MoveRequestBody) { + r.body = v +} + +func (r *MoveRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *MoveRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *MoveRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize() +} + +func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil +} + +type MoveRequestBody struct { + containerID []byte + treeID string + parentID uint64 + nodeID uint64 + meta []*KeyValue + bearerToken []byte +} + +func (r *MoveRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *MoveRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *MoveRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *MoveRequestBody) SetNodeID(v uint64) { + r.nodeID = v +} + +func (r *MoveRequestBody) SetMeta(v []*KeyValue) { + r.meta = v +} + +func (r *MoveRequestBody) SetParentID(v uint64) { + r.parentID = v +} + +type MoveResponse struct { + body *MoveResponseBody + signature *Signature +} + +func (r *MoveResponse) GetBody() *MoveResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *MoveResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type MoveResponseBody struct{} + +type GetNodeByPathRequest struct { + body *GetNodeByPathRequestBody + signature *Signature +} + +func (r *GetNodeByPathRequest) SetBody(v *GetNodeByPathRequestBody) { + r.body = v +} + +func (r *GetNodeByPathRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + return r.signature.FromGRPCMessage(signature) +} + +func (r *GetNodeByPathRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize() +} + +func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil +} + +type GetNodeByPathRequestBody struct { + containerID []byte + treeID string + pathAttribute string + path []string + attributes []string + latestOnly bool + allAttributes bool + bearerToken []byte +} + +func (r *GetNodeByPathRequestBody) SetContainerID(v [32]byte) { + r.containerID = v[:] +} + +func (r *GetNodeByPathRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *GetNodeByPathRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *GetNodeByPathRequestBody) SetPathAttribute(v string) { + r.pathAttribute = v +} + +func (r *GetNodeByPathRequestBody) SetParentID(v string) { + r.pathAttribute = v +} + +func (r *GetNodeByPathRequestBody) SetPath(v []string) { + r.path = v +} + +func (r *GetNodeByPathRequestBody) SetAttributes(v []string) { + r.attributes = v +} + +func (r *GetNodeByPathRequestBody) SetAllAttributes(v bool) { + r.allAttributes = v +} + +func (r *GetNodeByPathRequestBody) SetLatestOnly(v bool) { + r.latestOnly = v +} + +type GetNodeByPathResponse struct { + body *GetNodeByPathResponseBody + signature *Signature +} + +func (r *GetNodeByPathResponse) GetBody() *GetNodeByPathResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *GetNodeByPathResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type GetNodeByPathResponseBody struct { + nodes []*GetNodeByPathResponseInfo +} + +func (r *GetNodeByPathResponseBody) GetNodes() []*GetNodeByPathResponseInfo { + if r != nil { + return r.nodes + } + return nil +} + +type GetNodeByPathResponseInfo struct { + nodeID uint64 + timestamp uint64 + meta []*KeyValue + parentID uint64 +} + +func (r *GetNodeByPathResponseInfo) GetNodeID() uint64 { + if r != nil { + return r.nodeID + } + + return 0 +} + +func (r *GetNodeByPathResponseInfo) GetTimestamp() uint64 { + if r != nil { + return r.timestamp + } + + return 0 +} + +func (r *GetNodeByPathResponseInfo) GetParentID() uint64 { + if r != nil { + return r.parentID + } + + return 0 +} + +func (r *GetNodeByPathResponseInfo) GetMeta() []*KeyValue { + if r != nil { + return r.meta + } + + return nil +} + +type ListRequest struct { + body *ListRequestBody + signature *Signature +} + +func (r *ListRequest) SetBody(v *ListRequestBody) { + r.body = v +} + +type ListRequestBody struct { + containerID []byte +} + +func (r *ListRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +type ListResponse struct { + body *ListResponseBody + signature *Signature +} + +func (r *ListResponse) GetBody() *ListResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *ListResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type ListResponseBody struct { + ids []string +} + +func (r *ListResponseBody) GetIDs() []string { + if r != nil { + return r.ids + } + return nil +} + +type GetSubTreeRequest struct { + body *GetSubTreeRequestBody + signature *Signature +} + +func (r *GetSubTreeRequest) SetBody(v *GetSubTreeRequestBody) { + r.body = v +} + +func (r *GetSubTreeRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *GetSubTreeRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize() +} + +func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil +} + +type GetSubTreeRequestBody struct { + containerID []byte + treeID string + rootID []uint64 + depth uint32 + bearerToken []byte + orderBy *GetSubTreeRequestBodyOrder +} + +func (r *GetSubTreeRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *GetSubTreeRequestBody) SetBearerToken(v []byte) { + r.bearerToken = v +} + +func (r *GetSubTreeRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *GetSubTreeRequestBody) SetRootID(v []uint64) { + r.rootID = v +} + +func (r *GetSubTreeRequestBody) SetDepth(v uint32) { + r.depth = v +} + +func (r *GetSubTreeRequestBody) SetOrderBy(v *GetSubTreeRequestBodyOrder) { + r.orderBy = v +} + +type GetSubTreeRequestBodyOrder struct { + direction GetSubTreeRequestBodyOrderDirection +} + +func (r *GetSubTreeRequestBodyOrder) SetDirection(v GetSubTreeRequestBodyOrderDirection) { + r.direction = v +} + +const ( + GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None) + GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc) +) + +type GetSubTreeRequestBodyOrderDirection int32 + +type GetSubTreeResponse struct { + body *GetSubTreeResponseBody + signature *Signature +} + +func (r *GetSubTreeResponse) GetBody() *GetSubTreeResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *GetSubTreeResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type GetSubTreeResponseBody struct { + nodeID []uint64 + parentID []uint64 + timestamp []uint64 + meta []*KeyValue +} + +func (r *GetSubTreeResponseBody) GetNodeID() []uint64 { + if r != nil { + return r.nodeID + } + + return nil +} + +func (r *GetSubTreeResponseBody) GetParentID() []uint64 { + if r != nil { + return r.parentID + } + + return nil +} + +func (r *GetSubTreeResponseBody) GetTimestamp() []uint64 { + if r != nil { + return r.timestamp + } + + return nil +} + +func (r *GetSubTreeResponseBody) GetMeta() []*KeyValue { + if r != nil { + return r.meta + } + + return nil +} + +type ApplyRequest struct { + body *ApplyRequestBody + signature *Signature +} + +func (r *ApplyRequest) SetBody(v *ApplyRequestBody) { + r.body = v +} + +type ApplyRequestBody struct { + containerID []byte + treeID string + operation *LogMove +} + +func (r *ApplyRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *ApplyRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *ApplyRequestBody) SetOperation(v *LogMove) { + r.operation = v +} + +type ApplyResponse struct { + body *ApplyResponseBody + signature *Signature +} + +func (r *ApplyResponse) GetBody() *ApplyResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *ApplyResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type ApplyResponseBody struct{} + +type GetOpLogRequest struct { + body *GetOpLogRequestBody + signature *Signature +} + +func (r *GetOpLogRequest) SetBody(v *GetOpLogRequestBody) { + r.body = v +} + +func (r *GetOpLogRequest) GetSignature() *tree.Signature { + return r.signature.ToGRPCMessage().(*tree.Signature) +} + +func (r *GetOpLogRequest) SetSignature(signature *tree.Signature) error { + if r.signature == nil { + r.signature = new(Signature) + } + + return r.signature.FromGRPCMessage(signature) +} + +func (r *GetOpLogRequest) SignedDataSize() int { + return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize() +} + +func (r *GetOpLogRequest) ReadSignedData(buf []byte) ([]byte, error) { + return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil +} + +type GetOpLogRequestBody struct { + containerID []byte + treeID string + height uint64 + count uint64 +} + +func (r *GetOpLogRequestBody) SetTreeID(v string) { + r.treeID = v +} + +func (r *GetOpLogRequestBody) SetContainerID(v []byte) { + r.containerID = v +} + +func (r *GetOpLogRequestBody) SetHeight(v uint64) { + r.height = v +} + +func (r *GetOpLogRequestBody) SetCount(v uint64) { + r.count = v +} + +type GetOpLogResponse struct { + body *GetOpLogResponseBody + signature *Signature +} + +func (r *GetOpLogResponse) GetBody() *GetOpLogResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *GetOpLogResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type GetOpLogResponseBody struct { + operation *LogMove +} + +func (r *GetOpLogResponseBody) GetOperation() *LogMove { + if r != nil { + return r.operation + } + + return nil +} + +type HealthcheckRequest struct { + body *HealthcheckRequestBody + signature *Signature + + session.RequestHeaders +} + +func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) { + r.body = v +} + +type HealthcheckRequestBody struct{} + +type HealthcheckResponse struct { + body *HealthcheckResponseBody + signature *Signature +} + +func (r *HealthcheckResponse) GetBody() *HealthcheckResponseBody { + if r != nil { + return r.body + } + return nil +} + +func (r *HealthcheckResponse) GetSignature() *Signature { + if r != nil { + return r.signature + } + + return nil +} + +type HealthcheckResponseBody struct{} + +type LogMove struct { + parentID uint64 + meta []byte + childID uint64 +} + +func (g *LogMove) GetParentID() uint64 { + if g != nil { + return g.parentID + } + + return 0 +} + +func (g *LogMove) SetParentID(v uint64) { + g.parentID = v +} + +func (g *LogMove) GetMeta() []byte { + if g != nil { + return g.meta + } + + return nil +} + +func (g *LogMove) SetMeta(v []byte) { + g.meta = v +} + +func (g *LogMove) GetChildID() []byte { + if g != nil { + return g.meta + } + + return nil +} + +func (g *LogMove) SetChildID(v []byte) { + g.meta = v +} + +type Signature struct { + key []byte + sign []byte +} + +func (s *Signature) GetKey() []byte { + if s != nil { + return s.key + } + + return nil +} + +func (s *Signature) SetKey(v []byte) { + s.key = v +} + +func (s *Signature) GetSign() []byte { + if s != nil { + return s.sign + } + + return nil +} + +func (s *Signature) SetSign(v []byte) { + s.sign = v +} + +type KeyValue struct { + key string + value []byte +} + +func (k *KeyValue) GetKey() string { + if k != nil { + return k.key + } + + return "" +} + +func (k *KeyValue) SetKey(v string) { + k.key = v +} + +func (k *KeyValue) GetValue() []byte { + if k != nil { + return k.value + } + return nil +} + +func (k *KeyValue) SetValue(v []byte) { + k.value = v +} diff --git a/pool/tree/client.go b/pool/tree/client.go index 78a1610..b93b5e6 100644 --- a/pool/tree/client.go +++ b/pool/tree/client.go @@ -6,20 +6,22 @@ import ( "errors" "fmt" "sync" + "time" - apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" - grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) type treeClient struct { - mu sync.RWMutex - address string - opts []grpc.DialOption - conn *grpc.ClientConn - service grpcService.TreeServiceClient + mu sync.RWMutex + address string + opts []grpc.DialOption + client *rpcclient.Client + nodeDialTimeout time.Duration + streamTimeout time.Duration + healthy bool } @@ -27,10 +29,12 @@ type treeClient struct { var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint") // newTreeClient creates new tree client with auto dial. -func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { +func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient { return &treeClient{ - address: addr, - opts: opts, + address: addr, + opts: opts, + nodeDialTimeout: dialTimeout, + streamTimeout: streamTimeout, } } @@ -38,16 +42,17 @@ func (c *treeClient) dial(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() - if c.conn != nil { + if c.client != nil { return fmt.Errorf("couldn't dial '%s': connection already established", c.address) } var err error - if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil { + c.client, err = c.createClient() + if err != nil { return err } - if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil { return fmt.Errorf("healthcheck tree service: %w", err) } @@ -60,14 +65,14 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo c.mu.Lock() defer c.mu.Unlock() - if c.conn == nil { - if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil { + if c.client == nil { + if c.client, err = c.createClient(); err != nil { return false, err } } wasHealthy := c.healthy - if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { + if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil { c.healthy = false return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err) } @@ -77,39 +82,26 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo return !wasHealthy, nil } -func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) { - host, tlsEnable, err := apiClient.ParseURI(addr) - if err != nil { - return nil, nil, fmt.Errorf("parse address: %w", err) - } +func (c *treeClient) createClient() (*rpcclient.Client, error) { + cli := rpcclient.New(append( + rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}), + rpcclient.WithDialTimeout(c.nodeDialTimeout), + rpcclient.WithRWTimeout(c.streamTimeout), + rpcclient.WithGRPCDialOptions(c.opts), + )...) - creds := insecure.NewCredentials() - if tlsEnable { - creds = credentials.NewTLS(&tls.Config{}) - } - - options := []grpc.DialOption{grpc.WithTransportCredentials(creds)} - - // the order is matter, we want client to be able to overwrite options. - opts := append(options, clientOptions...) - - conn, err := grpc.NewClient(host, opts...) - if err != nil { - return nil, nil, fmt.Errorf("grpc create node tree service: %w", err) - } - - return conn, grpcService.NewTreeServiceClient(conn), nil + return cli, nil } -func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) { +func (c *treeClient) serviceClient() (*rpcclient.Client, error) { c.mu.RLock() defer c.mu.RUnlock() - if c.conn == nil || !c.healthy { + if c.client == nil || !c.healthy { return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address) } - return c.service, nil + return c.client, nil } func (c *treeClient) endpoint() string { @@ -132,9 +124,8 @@ func (c *treeClient) close() error { c.mu.Lock() defer c.mu.Unlock() - if c.conn == nil { + if c.client == nil || c.client.Conn() == nil { return nil } - - return c.conn.Close() + return c.client.Conn().Close() } diff --git a/pool/tree/pool.go b/pool/tree/pool.go index 15d4c68..f4f9bf8 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -10,9 +10,11 @@ import ( "sync" "time" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" - grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -51,7 +53,7 @@ var ( // This interface is expected to have exactly one production implementation - treeClient. // Others are expected to be for test purposes only. type client interface { - serviceClient() (grpcService.TreeServiceClient, error) + serviceClient() (*rpcclient.Client, error) endpoint() string isHealthy() bool setHealthy(bool) @@ -92,6 +94,7 @@ type Pool struct { maxRequestAttempts int streamTimeout time.Duration + nodeDialTimeout time.Duration startIndicesMtx sync.RWMutex // startIndices points to the client from which the next request will be executed. @@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error { for i, nodes := range p.rebalanceParams.nodesGroup { clients := make([]client, len(nodes)) for j, node := range nodes { - clients[j] = newTreeClient(node.Address(), p.dialOptions...) + clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout) if err := clients[j].dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err)) continue @@ -336,30 +339,28 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) { // Can return predefined errors: // * ErrNodeNotFound // * ErrNodeAccessDenied. -func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) { - request := &grpcService.GetNodeByPathRequest{ - Body: &grpcService.GetNodeByPathRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - Path: prm.Path, - Attributes: prm.Meta, - PathAttribute: prm.PathAttribute, - LatestOnly: prm.LatestOnly, - AllAttributes: prm.AllAttrs, - BearerToken: prm.BearerToken, - }, - } +func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) { + body := new(tree.GetNodeByPathRequestBody) + body.SetContainerID(prm.CID) + body.SetTreeID(prm.TreeID) + body.SetPath(prm.Path) + body.SetAttributes(prm.Meta) + body.SetPathAttribute(prm.PathAttribute) + body.SetAllAttributes(prm.AllAttrs) + body.SetLatestOnly(prm.LatestOnly) + body.SetBearerToken(prm.BearerToken) + + request := new(tree.GetNodeByPathRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } - var resp *grpcService.GetNodeByPathResponse - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) - defer cancel() - resp, inErr = client.GetNodeByPath(reqCtx, request) + var resp *tree.GetNodeByPathResponse + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx)) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. // Return an error there to trigger retry and ignore it after, @@ -381,13 +382,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService // // Must be initialized using Pool.GetSubTree, any other usage is unsafe. type SubTreeReader struct { - cli grpcService.TreeService_GetSubTreeClient + cli *rpcapi.GetSubTreeResponseReader } // Read reads another list of the subtree nodes. -func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) { +func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) { for i := range buf { - resp, err := x.cli.Recv() + var resp tree.GetSubTreeResponse + err := x.cli.Read(&resp) if err == io.EOF { return i, io.EOF } else if err != nil { @@ -400,10 +402,11 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e } // ReadAll reads all nodes subtree nodes. -func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { - var res []*grpcService.GetSubTreeResponse_Body +func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) { + var res []*tree.GetSubTreeResponseBody for { - resp, err := x.cli.Recv() + var resp tree.GetSubTreeResponse + err := x.cli.Read(&resp) if err == io.EOF { break } else if err != nil { @@ -416,8 +419,9 @@ func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error } // Next gets the next node from subtree. -func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) { - resp, err := x.cli.Recv() +func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) { + var resp tree.GetSubTreeResponse + err := x.cli.Read(&resp) if err == io.EOF { return nil, io.EOF } @@ -434,32 +438,33 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) { // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) { - request := &grpcService.GetSubTreeRequest{ - Body: &grpcService.GetSubTreeRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - RootId: prm.RootID, - Depth: prm.Depth, - BearerToken: prm.BearerToken, - OrderBy: new(grpcService.GetSubTreeRequest_Body_Order), - }, - } + body := new(tree.GetSubTreeRequestBody) + body.SetContainerID(prm.CID[:]) + body.SetTreeID(prm.TreeID) + body.SetBearerToken(prm.BearerToken) + body.SetDepth(prm.Depth) + body.SetRootID(prm.RootID) + orderBy := new(tree.GetSubTreeRequestBodyOrder) switch prm.Order { case AscendingOrder: - request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc + orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc) default: - request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None + orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone) } + body.SetOrderBy(orderBy) + + request := new(tree.GetSubTreeRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return nil, err } - var cli grpcService.TreeService_GetSubTreeClient - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - cli, inErr = client.GetSubTree(ctx, request) + var cli *rpcapi.GetSubTreeResponseReader + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx)) return handleError("failed to get sub tree client", inErr) }) p.methods[methodGetSubTree].IncRequests(time.Since(start)) @@ -476,26 +481,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { - request := &grpcService.AddRequest{ - Body: &grpcService.AddRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - ParentId: prm.Parent, - Meta: metaToKV(prm.Meta), - BearerToken: prm.BearerToken, - }, - } + body := new(tree.AddRequestBody) + body.SetTreeID(prm.TreeID) + body.SetBearerToken(prm.BearerToken) + body.SetContainerID(prm.CID[:]) + body.SetMeta(metaToKV(prm.Meta)) + body.SetParentID(prm.Parent) + + request := new(tree.AddRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } - var resp *grpcService.AddResponse - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) - defer cancel() - resp, inErr = client.Add(reqCtx, request) + var resp *tree.AddResponse + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node", inErr) }) p.methods[methodAddNode].IncRequests(time.Since(start)) @@ -503,7 +506,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { return 0, err } - return resp.GetBody().GetNodeId(), nil + return resp.GetBody().GetNodeID(), nil } // AddNodeByPath invokes eponymous method from TreeServiceClient. @@ -512,27 +515,25 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { - request := &grpcService.AddByPathRequest{ - Body: &grpcService.AddByPathRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - Path: prm.Path, - Meta: metaToKV(prm.Meta), - PathAttribute: prm.PathAttribute, - BearerToken: prm.BearerToken, - }, - } + body := new(tree.AddByPathRequestBody) + body.SetTreeID(prm.TreeID) + body.SetBearerToken(prm.BearerToken) + body.SetContainerID(prm.CID[:]) + body.SetMeta(metaToKV(prm.Meta)) + body.SetPathAttribute(prm.PathAttribute) + body.SetPath(prm.Path) + + request := new(tree.AddByPathRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return 0, err } - var resp *grpcService.AddByPathResponse - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { - reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) - defer cancel() - resp, inErr = client.AddByPath(reqCtx, request) + var resp *tree.AddByPathResponse + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node by path", inErr) }) p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) @@ -540,15 +541,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint return 0, err } - body := resp.GetBody() - if body == nil { + respBody := resp.GetBody() + if respBody == nil { return 0, errors.New("nil body in tree service response") - } else if len(body.GetNodes()) == 0 { + } else if len(respBody.GetNodes()) == 0 { return 0, errors.New("empty list of added nodes in tree service response") } // The first node is the leaf that we add, according to tree service docs. - return body.GetNodes()[0], nil + return respBody.GetNodes()[0], nil } // MoveNode invokes eponymous method from TreeServiceClient. @@ -557,26 +558,24 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { - request := &grpcService.MoveRequest{ - Body: &grpcService.MoveRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - NodeId: prm.NodeID, - ParentId: prm.ParentID, - Meta: metaToKV(prm.Meta), - BearerToken: prm.BearerToken, - }, - } + body := new(tree.MoveRequestBody) + body.SetTreeID(prm.TreeID) + body.SetBearerToken(prm.BearerToken) + body.SetContainerID(prm.CID[:]) + body.SetMeta(metaToKV(prm.Meta)) + body.SetNodeID(prm.NodeID) + body.SetParentID(prm.ParentID) + + request := new(tree.MoveRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return err } - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) - defer cancel() - if _, err := client.Move(reqCtx, request); err != nil { + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error { + if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to move node", err) } return nil @@ -592,24 +591,22 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { // * ErrNodeNotFound // * ErrNodeAccessDenied. func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { - request := &grpcService.RemoveRequest{ - Body: &grpcService.RemoveRequest_Body{ - ContainerId: prm.CID[:], - TreeId: prm.TreeID, - NodeId: prm.NodeID, - BearerToken: prm.BearerToken, - }, - } + body := new(tree.RemoveRequestBody) + body.SetTreeID(prm.TreeID) + body.SetBearerToken(prm.BearerToken) + body.SetContainerID(prm.CID[:]) + body.SetNodeID(prm.NodeID) + + request := new(tree.RemoveRequest) + request.SetBody(body) start := time.Now() if err := p.signRequest(request); err != nil { return err } - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { - reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) - defer cancel() - if _, err := client.Remove(reqCtx, request); err != nil { + err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error { + if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to remove node", err) } return nil @@ -661,11 +658,14 @@ func handleError(msg string, err error) error { return fmt.Errorf("%s: %w", msg, err) } -func metaToKV(meta map[string]string) []*grpcService.KeyValue { - result := make([]*grpcService.KeyValue, 0, len(meta)) +func metaToKV(meta map[string]string) []*tree.KeyValue { + result := make([]*tree.KeyValue, 0, len(meta)) for key, value := range meta { - result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)}) + kv := new(tree.KeyValue) + kv.SetKey(key) + kv.SetValue([]byte(value)) + result = append(result, kv) } return result @@ -814,10 +814,10 @@ func (p *Pool) setStartIndices(i, j int) { p.startIndicesMtx.Unlock() } -func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error { +func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error { var ( err, finErr error - cl grpcService.TreeServiceClient + cl *rpcclient.Client ) reqID := GetRequestID(ctx) diff --git a/pool/tree/pool_signature.go b/pool/tree/pool_signature.go index 5a8def1..f0d890a 100644 --- a/pool/tree/pool_signature.go +++ b/pool/tree/pool_signature.go @@ -9,7 +9,7 @@ type message interface { SignedDataSize() int ReadSignedData([]byte) ([]byte, error) GetSignature() *tree.Signature - SetSignature(*tree.Signature) + SetSignature(*tree.Signature) error } // signMessage uses the pool key and signs any protobuf @@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error { rawPub := make([]byte, keySDK.Public().MaxEncodedSize()) rawPub = rawPub[:keySDK.Public().Encode(rawPub)] - m.SetSignature(&tree.Signature{ + return m.SetSignature(&tree.Signature{ Key: rawPub, Sign: data, }) - - return nil } diff --git a/pool/tree/pool_test.go b/pool/tree/pool_test.go index c2e50eb..f9f4142 100644 --- a/pool/tree/pool_test.go +++ b/pool/tree/pool_test.go @@ -5,9 +5,9 @@ import ( "errors" "testing" + rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" - grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -17,7 +17,7 @@ type treeClientMock struct { err bool } -func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) { +func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) { if t.err { return nil, errors.New("serviceClient() mock error") } @@ -99,7 +99,7 @@ func TestRetry(t *testing.T) { maxRequestAttempts: lenNodes, } - makeFn := func(client grpcService.TreeServiceClient) error { + makeFn := func(client *rpcClient.Client) error { return nil } @@ -171,7 +171,7 @@ func TestRetry(t *testing.T) { t.Run("error empty result", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { if index < errNodes { index++ return errNodeEmptyResult @@ -184,7 +184,7 @@ func TestRetry(t *testing.T) { t.Run("error not found", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { if index < errNodes { index++ return ErrNodeNotFound @@ -197,7 +197,7 @@ func TestRetry(t *testing.T) { t.Run("error access denied", func(t *testing.T) { var index int - err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { + err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { index++ return ErrNodeAccessDenied })