[#185] Implement rpc/client for tree service #299

Merged
alexvanin merged 1 commit from nzinkevich/frostfs-sdk-go:feat/tree_service_rpc_client into master 2024-12-02 14:58:07 +00:00
7 changed files with 2875 additions and 164 deletions

178
api/rpc/tree.go Normal file
View file

@ -0,0 +1,178 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
)
const serviceTree = "tree.TreeService"
const (
rpcTreeAdd = "Add"
rpcTreeAddByPath = "AddByPath"
rpcTreeRemove = "Remove"
rpcTreeMove = "Move"
rpcTreeGetNodeByPath = "GetNodeByPath"
rpcTreeGetSubTree = "GetSubTree"
rpcTreeList = "TreeList"
rpcTreeApply = "Apply"
rpcTreeGetOpLog = "GetOpLog"
rpcTreeHealthcheck = "Healthcheck"
)
func Add(
cli *client.Client,
req *tree.AddRequest,
opts ...client.CallOption,
) (*tree.AddResponse, error) {
resp := new(tree.AddResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func AddByPath(
cli *client.Client,
req *tree.AddByPathRequest,
opts ...client.CallOption,
) (*tree.AddByPathResponse, error) {
resp := new(tree.AddByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Remove(cli *client.Client,
req *tree.RemoveRequest,
opts ...client.CallOption,
) (*tree.RemoveResponse, error) {
resp := new(tree.RemoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Move(cli *client.Client,
req *tree.MoveRequest,
opts ...client.CallOption,
) (*tree.MoveResponse, error) {
resp := new(tree.MoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func GetNodeByPath(cli *client.Client,
req *tree.GetNodeByPathRequest,
opts ...client.CallOption,
) (*tree.GetNodeByPathResponse, error) {
resp := new(tree.GetNodeByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type GetSubTreeResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error {
return r.r.ReadMessage(resp)
}
func GetSubTree(cli *client.Client,
req *tree.GetSubTreeRequest,
opts ...client.CallOption,
dkirillov marked this conversation as resolved Outdated

Can we use the same format across all file?
I mean write

func GetSubTree(cli *client.Client,
	req *tree.GetSubTreeRequest,
	opts ...client.CallOption,
) (*GetSubTreeResponseReader, error) {

instead of

	func GetSubTree(cli *client.Client,
	req *tree.GetSubTreeRequest,
	opts ...client.CallOption) (*GetSubTreeResponseReader, error) {
Can we use the same format across all file? I mean write ```golang func GetSubTree(cli *client.Client, req *tree.GetSubTreeRequest, opts ...client.CallOption, ) (*GetSubTreeResponseReader, error) { ``` instead of ```golang func GetSubTree(cli *client.Client, req *tree.GetSubTreeRequest, opts ...client.CallOption) (*GetSubTreeResponseReader, error) { ```
) (*GetSubTreeResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...)
if err != nil {
return nil, err
}
return &GetSubTreeResponseReader{
r: wc,
}, nil
}
func TreeList(cli *client.Client,
req *tree.ListRequest,
opts ...client.CallOption,
) (*tree.ListResponse, error) {
resp := new(tree.ListResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Apply(cli *client.Client,
req *tree.ApplyRequest,
opts ...client.CallOption,
) (*tree.ApplyResponse, error) {
resp := new(tree.ApplyResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type TreeServiceGetOpLogResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error {
return r.r.ReadMessage(resp)
}
func GetOpLog(cli *client.Client,
req *tree.GetOpLogRequest,
opts ...client.CallOption,
) (*TreeServiceGetOpLogResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...)
if err != nil {
return nil, err
}
return &TreeServiceGetOpLogResponseReader{
r: wc,
}, nil
}
func Healthcheck(cli *client.Client,
req *tree.HealthcheckRequest,
opts ...client.CallOption,
) (*tree.HealthcheckResponse, error) {
resp := new(tree.HealthcheckResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

1591
api/tree/convert.go Normal file

File diff suppressed because it is too large Load diff

953
api/tree/types.go Normal file
View file

@ -0,0 +1,953 @@
package tree
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
)
type AddRequest struct {
body *AddRequestBody
signature *Signature
}
func (r *AddRequest) GetBody() *AddRequestBody {
if r != nil {
return r.body
}
return nil
}
func (r *AddRequest) SetBody(v *AddRequestBody) {
r.body = v
}
func (r *AddRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *AddRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize()
}
func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil
}
type AddRequestBody struct {
containerID []byte
bearerToken []byte
treeID string
parentID uint64
meta []*KeyValue
}
func (r *AddRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *AddRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *AddRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *AddRequestBody) SetParentID(v uint64) {
r.parentID = v
}
func (r *AddRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
type AddResponse struct {
body *AddResponseBody
signature *Signature
}
func (r *AddResponse) GetBody() *AddResponseBody {
if r != nil {
return r.body
}
return nil
}
type AddResponseBody struct {
nodeID uint64
}
func (r *AddResponseBody) GetNodeID() uint64 {
if r != nil {
return r.nodeID
}
return 0
}
type AddByPathRequest struct {
body *AddByPathRequestBody
signature *Signature
}
func (r *AddByPathRequest) SetBody(v *AddByPathRequestBody) {
r.body = v
}
func (r *AddByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *AddByPathRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize()
}
func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil
}
type AddByPathRequestBody struct {
containerID []byte
treeID string
pathAttribute string
path []string
meta []*KeyValue
bearerToken []byte
}
func (r *AddByPathRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *AddByPathRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *AddByPathRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *AddByPathRequestBody) SetPathAttribute(v string) {
r.pathAttribute = v
}
func (r *AddByPathRequestBody) SetPath(v []string) {
r.path = v
}
func (r *AddByPathRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
type AddByPathResponse struct {
body *AddByPathResponseBody
signature *Signature
}
func (r *AddByPathResponse) GetBody() *AddByPathResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *AddByPathResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type AddByPathResponseBody struct {
nodes []uint64
parentID uint64
}
func (r *AddByPathResponseBody) GetNodes() []uint64 {
if r != nil {
return r.nodes
}
return nil
}
func (r *AddByPathResponseBody) GetParentID() uint64 {
if r != nil {
return r.parentID
}
return 0
}
type RemoveRequest struct {
body *RemoveRequestBody
signature *Signature
}
func (r *RemoveRequest) SetBody(v *RemoveRequestBody) {
r.body = v
}
func (r *RemoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *RemoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *RemoveRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize()
}
func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil
}
type RemoveRequestBody struct {
containerID []byte
treeID string
nodeID uint64
bearerToken []byte
}
func (r *RemoveRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *RemoveRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *RemoveRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *RemoveRequestBody) SetNodeID(v uint64) {
r.nodeID = v
}
type RemoveResponse struct {
body *RemoveResponseBody
signature *Signature
}
func (r *RemoveResponse) GetBody() *RemoveResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *RemoveResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type RemoveResponseBody struct{}
type MoveRequest struct {
body *MoveRequestBody
signature *Signature
}
func (r *MoveRequest) SetBody(v *MoveRequestBody) {
r.body = v
}
func (r *MoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *MoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *MoveRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize()
}
func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil
}
type MoveRequestBody struct {
containerID []byte
treeID string
parentID uint64
nodeID uint64
meta []*KeyValue
bearerToken []byte
}
aarifullin marked this conversation as resolved Outdated

Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses Operation, then it may get big-boom if it will access LogMove fields when it's actually nil.

Don't you consider to make all fields private as we do for all api-go related types? And provide Get/Set methods?

Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses `Operation`, then it may get big-boom if it will access `LogMove` fields when it's actually `nil`. Don't you consider to make all fields private as we do for all api-go related types? And provide `Get/Set` methods?

Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new *Request variable

Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new `*Request` variable

For the record, I (subjectively) prefer having public fields in the structs of API message definitions.

However all structures in api package defined with Get/Set methods, so it makes sense to keep it for tree as well.

For the record, I (subjectively) prefer having public fields in the structs of API message definitions. However all structures in `api` package defined with `Get/Set` methods, so it makes sense to keep it for tree as well.
func (r *MoveRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *MoveRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *MoveRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *MoveRequestBody) SetNodeID(v uint64) {
r.nodeID = v
}
func (r *MoveRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
func (r *MoveRequestBody) SetParentID(v uint64) {
r.parentID = v
}
type MoveResponse struct {
body *MoveResponseBody
signature *Signature
}
func (r *MoveResponse) GetBody() *MoveResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *MoveResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
aarifullin marked this conversation as resolved Outdated

Why does this method return dummy value instead nil?

Why does this method return dummy value instead `nil`?
type MoveResponseBody struct{}
type GetNodeByPathRequest struct {
body *GetNodeByPathRequestBody
signature *Signature
}
func (r *GetNodeByPathRequest) SetBody(v *GetNodeByPathRequestBody) {
r.body = v
}
func (r *GetNodeByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetNodeByPathRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize()
}
func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil
}
type GetNodeByPathRequestBody struct {
containerID []byte
treeID string
pathAttribute string
path []string
attributes []string
latestOnly bool
allAttributes bool
bearerToken []byte
}
func (r *GetNodeByPathRequestBody) SetContainerID(v [32]byte) {
r.containerID = v[:]
}
func (r *GetNodeByPathRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *GetNodeByPathRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetNodeByPathRequestBody) SetPathAttribute(v string) {
r.pathAttribute = v
}
func (r *GetNodeByPathRequestBody) SetParentID(v string) {
r.pathAttribute = v
}
func (r *GetNodeByPathRequestBody) SetPath(v []string) {
r.path = v
}
func (r *GetNodeByPathRequestBody) SetAttributes(v []string) {
r.attributes = v
}
func (r *GetNodeByPathRequestBody) SetAllAttributes(v bool) {
r.allAttributes = v
}
func (r *GetNodeByPathRequestBody) SetLatestOnly(v bool) {
r.latestOnly = v
}
type GetNodeByPathResponse struct {
body *GetNodeByPathResponseBody
signature *Signature
}
func (r *GetNodeByPathResponse) GetBody() *GetNodeByPathResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetNodeByPathResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetNodeByPathResponseBody struct {
nodes []*GetNodeByPathResponseInfo
}
func (r *GetNodeByPathResponseBody) GetNodes() []*GetNodeByPathResponseInfo {
if r != nil {
return r.nodes
}
return nil
}
type GetNodeByPathResponseInfo struct {
nodeID uint64
timestamp uint64
meta []*KeyValue
parentID uint64
}
func (r *GetNodeByPathResponseInfo) GetNodeID() uint64 {
if r != nil {
return r.nodeID
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetTimestamp() uint64 {
if r != nil {
return r.timestamp
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetParentID() uint64 {
if r != nil {
return r.parentID
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetMeta() []*KeyValue {
if r != nil {
return r.meta
}
return nil
}
type ListRequest struct {
body *ListRequestBody
signature *Signature
}
func (r *ListRequest) SetBody(v *ListRequestBody) {
r.body = v
}
type ListRequestBody struct {
containerID []byte
}
func (r *ListRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
type ListResponse struct {
body *ListResponseBody
signature *Signature
}
func (r *ListResponse) GetBody() *ListResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *ListResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type ListResponseBody struct {
ids []string
}
func (r *ListResponseBody) GetIDs() []string {
if r != nil {
return r.ids
}
return nil
}
type GetSubTreeRequest struct {
body *GetSubTreeRequestBody
signature *Signature
}
func (r *GetSubTreeRequest) SetBody(v *GetSubTreeRequestBody) {
r.body = v
}
func (r *GetSubTreeRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetSubTreeRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
}
func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
}
type GetSubTreeRequestBody struct {
containerID []byte
treeID string
rootID []uint64
depth uint32
bearerToken []byte
orderBy *GetSubTreeRequestBodyOrder
}
func (r *GetSubTreeRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *GetSubTreeRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *GetSubTreeRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetSubTreeRequestBody) SetRootID(v []uint64) {
r.rootID = v
}
func (r *GetSubTreeRequestBody) SetDepth(v uint32) {
r.depth = v
}
func (r *GetSubTreeRequestBody) SetOrderBy(v *GetSubTreeRequestBodyOrder) {
r.orderBy = v
}
type GetSubTreeRequestBodyOrder struct {
direction GetSubTreeRequestBodyOrderDirection
}
func (r *GetSubTreeRequestBodyOrder) SetDirection(v GetSubTreeRequestBodyOrderDirection) {
r.direction = v
}
const (
GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None)
GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc)
)
type GetSubTreeRequestBodyOrderDirection int32
type GetSubTreeResponse struct {
body *GetSubTreeResponseBody
signature *Signature
}
func (r *GetSubTreeResponse) GetBody() *GetSubTreeResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetSubTreeResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetSubTreeResponseBody struct {
nodeID []uint64
parentID []uint64
timestamp []uint64
meta []*KeyValue
}
func (r *GetSubTreeResponseBody) GetNodeID() []uint64 {
if r != nil {
return r.nodeID
}
return nil
}
func (r *GetSubTreeResponseBody) GetParentID() []uint64 {
if r != nil {
return r.parentID
}
return nil
}
func (r *GetSubTreeResponseBody) GetTimestamp() []uint64 {
if r != nil {
return r.timestamp
}
return nil
}
func (r *GetSubTreeResponseBody) GetMeta() []*KeyValue {
if r != nil {
return r.meta
}
return nil
}
type ApplyRequest struct {
body *ApplyRequestBody
signature *Signature
}
func (r *ApplyRequest) SetBody(v *ApplyRequestBody) {
r.body = v
}
type ApplyRequestBody struct {
containerID []byte
treeID string
operation *LogMove
}
func (r *ApplyRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *ApplyRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *ApplyRequestBody) SetOperation(v *LogMove) {
r.operation = v
}
type ApplyResponse struct {
body *ApplyResponseBody
signature *Signature
}
func (r *ApplyResponse) GetBody() *ApplyResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *ApplyResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type ApplyResponseBody struct{}
type GetOpLogRequest struct {
body *GetOpLogRequestBody
signature *Signature
}
func (r *GetOpLogRequest) SetBody(v *GetOpLogRequestBody) {
r.body = v
}
func (r *GetOpLogRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetOpLogRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetOpLogRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
}
func (r *GetOpLogRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
}
type GetOpLogRequestBody struct {
containerID []byte
treeID string
height uint64
count uint64
}
func (r *GetOpLogRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetOpLogRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *GetOpLogRequestBody) SetHeight(v uint64) {
r.height = v
}
func (r *GetOpLogRequestBody) SetCount(v uint64) {
r.count = v
}
type GetOpLogResponse struct {
body *GetOpLogResponseBody
signature *Signature
}
func (r *GetOpLogResponse) GetBody() *GetOpLogResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetOpLogResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetOpLogResponseBody struct {
operation *LogMove
}
func (r *GetOpLogResponseBody) GetOperation() *LogMove {
if r != nil {
return r.operation
}
return nil
}
type HealthcheckRequest struct {
body *HealthcheckRequestBody
signature *Signature
session.RequestHeaders
}
func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) {
r.body = v
}
type HealthcheckRequestBody struct{}
type HealthcheckResponse struct {
body *HealthcheckResponseBody
signature *Signature
}
func (r *HealthcheckResponse) GetBody() *HealthcheckResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *HealthcheckResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type HealthcheckResponseBody struct{}
type LogMove struct {
parentID uint64
meta []byte
childID uint64
}
func (g *LogMove) GetParentID() uint64 {
if g != nil {
return g.parentID
}
return 0
}
func (g *LogMove) SetParentID(v uint64) {
g.parentID = v
}
func (g *LogMove) GetMeta() []byte {
if g != nil {
return g.meta
}
return nil
}
func (g *LogMove) SetMeta(v []byte) {
g.meta = v
}
func (g *LogMove) GetChildID() []byte {
if g != nil {
return g.meta
}
return nil
}
func (g *LogMove) SetChildID(v []byte) {
g.meta = v
}
type Signature struct {
key []byte
sign []byte
}
func (s *Signature) GetKey() []byte {
if s != nil {
return s.key
}
return nil
}
func (s *Signature) SetKey(v []byte) {
s.key = v
}
func (s *Signature) GetSign() []byte {
if s != nil {
return s.sign
}
return nil
}
func (s *Signature) SetSign(v []byte) {
s.sign = v
}
type KeyValue struct {
key string
value []byte
}
func (k *KeyValue) GetKey() string {
if k != nil {
return k.key
}
return ""
}
func (k *KeyValue) SetKey(v string) {
k.key = v
}
func (k *KeyValue) GetValue() []byte {
if k != nil {
return k.value
}
return nil
}
func (k *KeyValue) SetValue(v []byte) {
k.value = v
}

View file

@ -6,20 +6,22 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"time"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
) )
type treeClient struct { type treeClient struct {
mu sync.RWMutex mu sync.RWMutex
address string address string
opts []grpc.DialOption opts []grpc.DialOption
conn *grpc.ClientConn client *rpcclient.Client
service grpcService.TreeServiceClient nodeDialTimeout time.Duration
streamTimeout time.Duration
dkirillov marked this conversation as resolved Outdated

Why do we need separate field? Is not enough having client *rpcclient.Client?

Why do we need separate field? Is not enough having `client *rpcclient.Client`?
healthy bool healthy bool
} }
@ -27,10 +29,12 @@ type treeClient struct {
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint") var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
// newTreeClient creates new tree client with auto dial. // newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient { func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
return &treeClient{ return &treeClient{
address: addr, address: addr,
opts: opts, opts: opts,
nodeDialTimeout: dialTimeout,
streamTimeout: streamTimeout,
} }
} }
@ -38,16 +42,17 @@ func (c *treeClient) dial(ctx context.Context) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.conn != nil { if c.client != nil {
return fmt.Errorf("couldn't dial '%s': connection already established", c.address) return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
} }
var err error var err error
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil { c.client, err = c.createClient()
if err != nil {
return err return err
} }
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
return fmt.Errorf("healthcheck tree service: %w", err) return fmt.Errorf("healthcheck tree service: %w", err)
} }
@ -60,14 +65,14 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.conn == nil { if c.client == nil {
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil { if c.client, err = c.createClient(); err != nil {
return false, err return false, err
} }
} }
dkirillov marked this conversation as resolved
Review

I would expect checking for if c.client == nil

I would expect checking for `if c.client == nil`
wasHealthy := c.healthy wasHealthy := c.healthy
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
c.healthy = false c.healthy = false
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err) return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
} }
@ -77,39 +82,26 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
return !wasHealthy, nil return !wasHealthy, nil
} }
func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) { func (c *treeClient) createClient() (*rpcclient.Client, error) {
host, tlsEnable, err := apiClient.ParseURI(addr) cli := rpcclient.New(append(
if err != nil { rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
return nil, nil, fmt.Errorf("parse address: %w", err) rpcclient.WithDialTimeout(c.nodeDialTimeout),
} rpcclient.WithRWTimeout(c.streamTimeout),
rpcclient.WithGRPCDialOptions(c.opts),
)...)
creds := insecure.NewCredentials() return cli, nil
if tlsEnable {
creds = credentials.NewTLS(&tls.Config{})
}
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
// the order is matter, we want client to be able to overwrite options.
opts := append(options, clientOptions...)
conn, err := grpc.NewClient(host, opts...)
if err != nil {
return nil, nil, fmt.Errorf("grpc create node tree service: %w", err)
}
return conn, grpcService.NewTreeServiceClient(conn), nil
} }
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) { func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.conn == nil || !c.healthy { if c.client == nil || !c.healthy {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address) return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
} }
return c.service, nil return c.client, nil
} }
func (c *treeClient) endpoint() string { func (c *treeClient) endpoint() string {
dkirillov marked this conversation as resolved Outdated

If we use this options, than we don't need previously defined options

If we use this options, than we don't need previously defined `options`
@ -132,9 +124,8 @@ func (c *treeClient) close() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.conn == nil { if c.client == nil || c.client.Conn() == nil {
return nil return nil
dkirillov marked this conversation as resolved Outdated

If we check c.client for nil then conn := c.client.Conn() above can cause panic. (Probably this will never happen but code logically incorrect)

If we check `c.client` for `nil` then `conn := c.client.Conn()` above can cause panic. (Probably this will never happen but code logically incorrect)
} }
return c.client.Conn().Close()
return c.conn.Close()
} }

View file

@ -10,9 +10,11 @@ import (
"sync" "sync"
"time" "time"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -51,7 +53,7 @@ var (
// This interface is expected to have exactly one production implementation - treeClient. // This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only. // Others are expected to be for test purposes only.
type client interface { type client interface {
serviceClient() (grpcService.TreeServiceClient, error) serviceClient() (*rpcclient.Client, error)
endpoint() string endpoint() string
isHealthy() bool isHealthy() bool
setHealthy(bool) setHealthy(bool)
@ -92,6 +94,7 @@ type Pool struct {
maxRequestAttempts int maxRequestAttempts int
streamTimeout time.Duration streamTimeout time.Duration
nodeDialTimeout time.Duration
startIndicesMtx sync.RWMutex startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed. // startIndices points to the client from which the next request will be executed.
@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error {
for i, nodes := range p.rebalanceParams.nodesGroup { for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes)) clients := make([]client, len(nodes))
for j, node := range nodes { for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...) clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err := clients[j].dial(ctx); err != nil { if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err)) p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue continue
@ -336,30 +339,28 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
// Can return predefined errors: // Can return predefined errors:
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) { func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
request := &grpcService.GetNodeByPathRequest{ body := new(tree.GetNodeByPathRequestBody)
Body: &grpcService.GetNodeByPathRequest_Body{ body.SetContainerID(prm.CID)
ContainerId: prm.CID[:], body.SetTreeID(prm.TreeID)
TreeId: prm.TreeID, body.SetPath(prm.Path)
Path: prm.Path, body.SetAttributes(prm.Meta)
Attributes: prm.Meta, body.SetPathAttribute(prm.PathAttribute)
PathAttribute: prm.PathAttribute, body.SetAllAttributes(prm.AllAttrs)
LatestOnly: prm.LatestOnly, body.SetLatestOnly(prm.LatestOnly)
AllAttributes: prm.AllAttrs, body.SetBearerToken(prm.BearerToken)
BearerToken: prm.BearerToken,
}, request := new(tree.GetNodeByPathRequest)
} request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var resp *grpcService.GetNodeByPathResponse var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
defer cancel()
resp, inErr = client.GetNodeByPath(reqCtx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty. // Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync. // Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after, // Return an error there to trigger retry and ignore it after,
@ -381,13 +382,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
// //
// Must be initialized using Pool.GetSubTree, any other usage is unsafe. // Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct { type SubTreeReader struct {
cli grpcService.TreeService_GetSubTreeClient cli *rpcapi.GetSubTreeResponseReader
} }
// Read reads another list of the subtree nodes. // Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) { func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf { for i := range buf {
resp, err := x.cli.Recv() var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF { if err == io.EOF {
return i, io.EOF return i, io.EOF
} else if err != nil { } else if err != nil {
@ -400,10 +402,11 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
} }
// ReadAll reads all nodes subtree nodes. // ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) { func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*grpcService.GetSubTreeResponse_Body var res []*tree.GetSubTreeResponseBody
for { for {
resp, err := x.cli.Recv() var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {
@ -416,8 +419,9 @@ func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error
} }
// Next gets the next node from subtree. // Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) { func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
resp, err := x.cli.Recv() var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF { if err == io.EOF {
return nil, io.EOF return nil, io.EOF
} }
@ -434,32 +438,33 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) { func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
request := &grpcService.GetSubTreeRequest{ body := new(tree.GetSubTreeRequestBody)
Body: &grpcService.GetSubTreeRequest_Body{ body.SetContainerID(prm.CID[:])

minor: Probably we can pass prm.CID to method and inside transform to bytes

minor: Probably we can pass `prm.CID` to method and inside transform to bytes

I meant accept cid.ID as parameter type, but ok

I meant accept `cid.ID` as parameter type, but ok
ContainerId: prm.CID[:], body.SetTreeID(prm.TreeID)
TreeId: prm.TreeID, body.SetBearerToken(prm.BearerToken)
RootId: prm.RootID, body.SetDepth(prm.Depth)
Depth: prm.Depth, body.SetRootID(prm.RootID)
BearerToken: prm.BearerToken,
OrderBy: new(grpcService.GetSubTreeRequest_Body_Order),
},
}
orderBy := new(tree.GetSubTreeRequestBodyOrder)
switch prm.Order { switch prm.Order {
case AscendingOrder: case AscendingOrder:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
default: default:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
} }
body.SetOrderBy(orderBy)
request := new(tree.GetSubTreeRequest)
request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return nil, err return nil, err
} }
var cli grpcService.TreeService_GetSubTreeClient var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request) cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}) })
p.methods[methodGetSubTree].IncRequests(time.Since(start)) p.methods[methodGetSubTree].IncRequests(time.Since(start))
@ -476,26 +481,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
request := &grpcService.AddRequest{ body := new(tree.AddRequestBody)
Body: &grpcService.AddRequest_Body{ body.SetTreeID(prm.TreeID)
ContainerId: prm.CID[:], body.SetBearerToken(prm.BearerToken)
TreeId: prm.TreeID, body.SetContainerID(prm.CID[:])
ParentId: prm.Parent, body.SetMeta(metaToKV(prm.Meta))
Meta: metaToKV(prm.Meta), body.SetParentID(prm.Parent)
BearerToken: prm.BearerToken,
}, request := new(tree.AddRequest)
} request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddResponse var resp *tree.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
defer cancel()
resp, inErr = client.Add(reqCtx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}) })
p.methods[methodAddNode].IncRequests(time.Since(start)) p.methods[methodAddNode].IncRequests(time.Since(start))
@ -503,7 +506,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
return 0, err return 0, err
} }
return resp.GetBody().GetNodeId(), nil return resp.GetBody().GetNodeID(), nil
} }
// AddNodeByPath invokes eponymous method from TreeServiceClient. // AddNodeByPath invokes eponymous method from TreeServiceClient.
@ -512,27 +515,25 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) { func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
request := &grpcService.AddByPathRequest{ body := new(tree.AddByPathRequestBody)
Body: &grpcService.AddByPathRequest_Body{ body.SetTreeID(prm.TreeID)
ContainerId: prm.CID[:], body.SetBearerToken(prm.BearerToken)
TreeId: prm.TreeID, body.SetContainerID(prm.CID[:])
Path: prm.Path, body.SetMeta(metaToKV(prm.Meta))
Meta: metaToKV(prm.Meta), body.SetPathAttribute(prm.PathAttribute)
PathAttribute: prm.PathAttribute, body.SetPath(prm.Path)
BearerToken: prm.BearerToken,
}, request := new(tree.AddByPathRequest)
} request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return 0, err return 0, err
} }
var resp *grpcService.AddByPathResponse var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}) })
p.methods[methodAddNodeByPath].IncRequests(time.Since(start)) p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -540,15 +541,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
return 0, err return 0, err
} }
body := resp.GetBody() respBody := resp.GetBody()
if body == nil { if respBody == nil {
return 0, errors.New("nil body in tree service response") return 0, errors.New("nil body in tree service response")
} else if len(body.GetNodes()) == 0 { } else if len(respBody.GetNodes()) == 0 {
return 0, errors.New("empty list of added nodes in tree service response") return 0, errors.New("empty list of added nodes in tree service response")
} }
// The first node is the leaf that we add, according to tree service docs. // The first node is the leaf that we add, according to tree service docs.
return body.GetNodes()[0], nil return respBody.GetNodes()[0], nil
} }
// MoveNode invokes eponymous method from TreeServiceClient. // MoveNode invokes eponymous method from TreeServiceClient.
@ -557,26 +558,24 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
request := &grpcService.MoveRequest{ body := new(tree.MoveRequestBody)
Body: &grpcService.MoveRequest_Body{ body.SetTreeID(prm.TreeID)
ContainerId: prm.CID[:], body.SetBearerToken(prm.BearerToken)
TreeId: prm.TreeID, body.SetContainerID(prm.CID[:])
NodeId: prm.NodeID, body.SetMeta(metaToKV(prm.Meta))
ParentId: prm.ParentID, body.SetNodeID(prm.NodeID)
Meta: metaToKV(prm.Meta), body.SetParentID(prm.ParentID)
BearerToken: prm.BearerToken,
}, request := new(tree.MoveRequest)
} request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
@ -592,24 +591,22 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
// * ErrNodeNotFound // * ErrNodeNotFound
// * ErrNodeAccessDenied. // * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
request := &grpcService.RemoveRequest{ body := new(tree.RemoveRequestBody)
Body: &grpcService.RemoveRequest_Body{ body.SetTreeID(prm.TreeID)
ContainerId: prm.CID[:], body.SetBearerToken(prm.BearerToken)
TreeId: prm.TreeID, body.SetContainerID(prm.CID[:])
NodeId: prm.NodeID, body.SetNodeID(prm.NodeID)
BearerToken: prm.BearerToken,
}, request := new(tree.RemoveRequest)
} request.SetBody(body)
start := time.Now() start := time.Now()
if err := p.signRequest(request); err != nil { if err := p.signRequest(request); err != nil {
return err return err
} }
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout) if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
@ -661,11 +658,14 @@ func handleError(msg string, err error) error {
return fmt.Errorf("%s: %w", msg, err) return fmt.Errorf("%s: %w", msg, err)
} }
func metaToKV(meta map[string]string) []*grpcService.KeyValue { func metaToKV(meta map[string]string) []*tree.KeyValue {
result := make([]*grpcService.KeyValue, 0, len(meta)) result := make([]*tree.KeyValue, 0, len(meta))
for key, value := range meta { for key, value := range meta {
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)}) kv := new(tree.KeyValue)
kv.SetKey(key)
kv.SetValue([]byte(value))
result = append(result, kv)
} }
return result return result
@ -814,10 +814,10 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock() p.startIndicesMtx.Unlock()
} }
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error { func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
var ( var (
err, finErr error err, finErr error
cl grpcService.TreeServiceClient cl *rpcclient.Client
) )
reqID := GetRequestID(ctx) reqID := GetRequestID(ctx)

View file

@ -9,7 +9,7 @@ type message interface {
SignedDataSize() int SignedDataSize() int
ReadSignedData([]byte) ([]byte, error) ReadSignedData([]byte) ([]byte, error)
GetSignature() *tree.Signature GetSignature() *tree.Signature
SetSignature(*tree.Signature) SetSignature(*tree.Signature) error
} }
// signMessage uses the pool key and signs any protobuf // signMessage uses the pool key and signs any protobuf
@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error {
rawPub := make([]byte, keySDK.Public().MaxEncodedSize()) rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
rawPub = rawPub[:keySDK.Public().Encode(rawPub)] rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
m.SetSignature(&tree.Signature{ return m.SetSignature(&tree.Signature{
Key: rawPub, Key: rawPub,
Sign: data, Sign: data,
}) })
return nil
} }

View file

@ -5,9 +5,9 @@ import (
"errors" "errors"
"testing" "testing"
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -17,7 +17,7 @@ type treeClientMock struct {
err bool err bool
} }
func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) { func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
if t.err { if t.err {
return nil, errors.New("serviceClient() mock error") return nil, errors.New("serviceClient() mock error")
} }
@ -99,7 +99,7 @@ func TestRetry(t *testing.T) {
maxRequestAttempts: lenNodes, maxRequestAttempts: lenNodes,
} }
makeFn := func(client grpcService.TreeServiceClient) error { makeFn := func(client *rpcClient.Client) error {
return nil return nil
} }
@ -171,7 +171,7 @@ func TestRetry(t *testing.T) {
t.Run("error empty result", func(t *testing.T) { t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes { if index < errNodes {
index++ index++
return errNodeEmptyResult return errNodeEmptyResult
@ -184,7 +184,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) { t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0 errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes { if index < errNodes {
index++ index++
return ErrNodeNotFound return ErrNodeNotFound
@ -197,7 +197,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) { t.Run("error access denied", func(t *testing.T) {
var index int var index int
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error { err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
index++ index++
return ErrNodeAccessDenied return ErrNodeAccessDenied
}) })