[#185] Implement rpc/client for tree service #299

Merged
alexvanin merged 1 commit from nzinkevich/frostfs-sdk-go:feat/tree_service_rpc_client into master 2024-12-02 14:58:07 +00:00
7 changed files with 2875 additions and 164 deletions

178
api/rpc/tree.go Normal file
View file

@ -0,0 +1,178 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
)
const serviceTree = "tree.TreeService"
const (
rpcTreeAdd = "Add"
rpcTreeAddByPath = "AddByPath"
rpcTreeRemove = "Remove"
rpcTreeMove = "Move"
rpcTreeGetNodeByPath = "GetNodeByPath"
rpcTreeGetSubTree = "GetSubTree"
rpcTreeList = "TreeList"
rpcTreeApply = "Apply"
rpcTreeGetOpLog = "GetOpLog"
rpcTreeHealthcheck = "Healthcheck"
)
func Add(
cli *client.Client,
req *tree.AddRequest,
opts ...client.CallOption,
) (*tree.AddResponse, error) {
resp := new(tree.AddResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func AddByPath(
cli *client.Client,
req *tree.AddByPathRequest,
opts ...client.CallOption,
) (*tree.AddByPathResponse, error) {
resp := new(tree.AddByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Remove(cli *client.Client,
req *tree.RemoveRequest,
opts ...client.CallOption,
) (*tree.RemoveResponse, error) {
resp := new(tree.RemoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Move(cli *client.Client,
req *tree.MoveRequest,
opts ...client.CallOption,
) (*tree.MoveResponse, error) {
resp := new(tree.MoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func GetNodeByPath(cli *client.Client,
req *tree.GetNodeByPathRequest,
opts ...client.CallOption,
) (*tree.GetNodeByPathResponse, error) {
resp := new(tree.GetNodeByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type GetSubTreeResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error {
return r.r.ReadMessage(resp)
}
func GetSubTree(cli *client.Client,
req *tree.GetSubTreeRequest,
opts ...client.CallOption,
dkirillov marked this conversation as resolved Outdated

Can we use the same format across all file?
I mean write

func GetSubTree(cli *client.Client,
	req *tree.GetSubTreeRequest,
	opts ...client.CallOption,
) (*GetSubTreeResponseReader, error) {

instead of

	func GetSubTree(cli *client.Client,
	req *tree.GetSubTreeRequest,
	opts ...client.CallOption) (*GetSubTreeResponseReader, error) {
Can we use the same format across all file? I mean write ```golang func GetSubTree(cli *client.Client, req *tree.GetSubTreeRequest, opts ...client.CallOption, ) (*GetSubTreeResponseReader, error) { ``` instead of ```golang func GetSubTree(cli *client.Client, req *tree.GetSubTreeRequest, opts ...client.CallOption) (*GetSubTreeResponseReader, error) { ```
) (*GetSubTreeResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...)
if err != nil {
return nil, err
}
return &GetSubTreeResponseReader{
r: wc,
}, nil
}
func TreeList(cli *client.Client,
req *tree.ListRequest,
opts ...client.CallOption,
) (*tree.ListResponse, error) {
resp := new(tree.ListResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Apply(cli *client.Client,
req *tree.ApplyRequest,
opts ...client.CallOption,
) (*tree.ApplyResponse, error) {
resp := new(tree.ApplyResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type TreeServiceGetOpLogResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error {
return r.r.ReadMessage(resp)
}
func GetOpLog(cli *client.Client,
req *tree.GetOpLogRequest,
opts ...client.CallOption,
) (*TreeServiceGetOpLogResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...)
if err != nil {
return nil, err
}
return &TreeServiceGetOpLogResponseReader{
r: wc,
}, nil
}
func Healthcheck(cli *client.Client,
req *tree.HealthcheckRequest,
opts ...client.CallOption,
) (*tree.HealthcheckResponse, error) {
resp := new(tree.HealthcheckResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

1591
api/tree/convert.go Normal file

File diff suppressed because it is too large Load diff

953
api/tree/types.go Normal file
View file

@ -0,0 +1,953 @@
package tree
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
)
type AddRequest struct {
body *AddRequestBody
signature *Signature
}
func (r *AddRequest) GetBody() *AddRequestBody {
if r != nil {
return r.body
}
return nil
}
func (r *AddRequest) SetBody(v *AddRequestBody) {
r.body = v
}
func (r *AddRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *AddRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize()
}
func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil
}
type AddRequestBody struct {
containerID []byte
bearerToken []byte
treeID string
parentID uint64
meta []*KeyValue
}
func (r *AddRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *AddRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *AddRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *AddRequestBody) SetParentID(v uint64) {
r.parentID = v
}
func (r *AddRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
type AddResponse struct {
body *AddResponseBody
signature *Signature
}
func (r *AddResponse) GetBody() *AddResponseBody {
if r != nil {
return r.body
}
return nil
}
type AddResponseBody struct {
nodeID uint64
}
func (r *AddResponseBody) GetNodeID() uint64 {
if r != nil {
return r.nodeID
}
return 0
}
type AddByPathRequest struct {
body *AddByPathRequestBody
signature *Signature
}
func (r *AddByPathRequest) SetBody(v *AddByPathRequestBody) {
r.body = v
}
func (r *AddByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *AddByPathRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize()
}
func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil
}
type AddByPathRequestBody struct {
containerID []byte
treeID string
pathAttribute string
path []string
meta []*KeyValue
bearerToken []byte
}
func (r *AddByPathRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *AddByPathRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *AddByPathRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *AddByPathRequestBody) SetPathAttribute(v string) {
r.pathAttribute = v
}
func (r *AddByPathRequestBody) SetPath(v []string) {
r.path = v
}
func (r *AddByPathRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
type AddByPathResponse struct {
body *AddByPathResponseBody
signature *Signature
}
func (r *AddByPathResponse) GetBody() *AddByPathResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *AddByPathResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type AddByPathResponseBody struct {
nodes []uint64
parentID uint64
}
func (r *AddByPathResponseBody) GetNodes() []uint64 {
if r != nil {
return r.nodes
}
return nil
}
func (r *AddByPathResponseBody) GetParentID() uint64 {
if r != nil {
return r.parentID
}
return 0
}
type RemoveRequest struct {
body *RemoveRequestBody
signature *Signature
}
func (r *RemoveRequest) SetBody(v *RemoveRequestBody) {
r.body = v
}
func (r *RemoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *RemoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *RemoveRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize()
}
func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil
}
type RemoveRequestBody struct {
containerID []byte
treeID string
nodeID uint64
bearerToken []byte
}
func (r *RemoveRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *RemoveRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *RemoveRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *RemoveRequestBody) SetNodeID(v uint64) {
r.nodeID = v
}
type RemoveResponse struct {
body *RemoveResponseBody
signature *Signature
}
func (r *RemoveResponse) GetBody() *RemoveResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *RemoveResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type RemoveResponseBody struct{}
type MoveRequest struct {
body *MoveRequestBody
signature *Signature
}
func (r *MoveRequest) SetBody(v *MoveRequestBody) {
r.body = v
}
func (r *MoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *MoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *MoveRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize()
}
func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil
}
type MoveRequestBody struct {
containerID []byte
treeID string
parentID uint64
nodeID uint64
meta []*KeyValue
bearerToken []byte
}
aarifullin marked this conversation as resolved Outdated

Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses Operation, then it may get big-boom if it will access LogMove fields when it's actually nil.

Don't you consider to make all fields private as we do for all api-go related types? And provide Get/Set methods?

Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses `Operation`, then it may get big-boom if it will access `LogMove` fields when it's actually `nil`. Don't you consider to make all fields private as we do for all api-go related types? And provide `Get/Set` methods?

Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new *Request variable

Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new `*Request` variable

For the record, I (subjectively) prefer having public fields in the structs of API message definitions.

However all structures in api package defined with Get/Set methods, so it makes sense to keep it for tree as well.

For the record, I (subjectively) prefer having public fields in the structs of API message definitions. However all structures in `api` package defined with `Get/Set` methods, so it makes sense to keep it for tree as well.
func (r *MoveRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *MoveRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *MoveRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *MoveRequestBody) SetNodeID(v uint64) {
r.nodeID = v
}
func (r *MoveRequestBody) SetMeta(v []*KeyValue) {
r.meta = v
}
func (r *MoveRequestBody) SetParentID(v uint64) {
r.parentID = v
}
type MoveResponse struct {
body *MoveResponseBody
signature *Signature
}
func (r *MoveResponse) GetBody() *MoveResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *MoveResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
aarifullin marked this conversation as resolved Outdated

Why does this method return dummy value instead nil?

Why does this method return dummy value instead `nil`?
type MoveResponseBody struct{}
type GetNodeByPathRequest struct {
body *GetNodeByPathRequestBody
signature *Signature
}
func (r *GetNodeByPathRequest) SetBody(v *GetNodeByPathRequestBody) {
r.body = v
}
func (r *GetNodeByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetNodeByPathRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize()
}
func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil
}
type GetNodeByPathRequestBody struct {
containerID []byte
treeID string
pathAttribute string
path []string
attributes []string
latestOnly bool
allAttributes bool
bearerToken []byte
}
func (r *GetNodeByPathRequestBody) SetContainerID(v [32]byte) {
r.containerID = v[:]
}
func (r *GetNodeByPathRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *GetNodeByPathRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetNodeByPathRequestBody) SetPathAttribute(v string) {
r.pathAttribute = v
}
func (r *GetNodeByPathRequestBody) SetParentID(v string) {
r.pathAttribute = v
}
func (r *GetNodeByPathRequestBody) SetPath(v []string) {
r.path = v
}
func (r *GetNodeByPathRequestBody) SetAttributes(v []string) {
r.attributes = v
}
func (r *GetNodeByPathRequestBody) SetAllAttributes(v bool) {
r.allAttributes = v
}
func (r *GetNodeByPathRequestBody) SetLatestOnly(v bool) {
r.latestOnly = v
}
type GetNodeByPathResponse struct {
body *GetNodeByPathResponseBody
signature *Signature
}
func (r *GetNodeByPathResponse) GetBody() *GetNodeByPathResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetNodeByPathResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetNodeByPathResponseBody struct {
nodes []*GetNodeByPathResponseInfo
}
func (r *GetNodeByPathResponseBody) GetNodes() []*GetNodeByPathResponseInfo {
if r != nil {
return r.nodes
}
return nil
}
type GetNodeByPathResponseInfo struct {
nodeID uint64
timestamp uint64
meta []*KeyValue
parentID uint64
}
func (r *GetNodeByPathResponseInfo) GetNodeID() uint64 {
if r != nil {
return r.nodeID
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetTimestamp() uint64 {
if r != nil {
return r.timestamp
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetParentID() uint64 {
if r != nil {
return r.parentID
}
return 0
}
func (r *GetNodeByPathResponseInfo) GetMeta() []*KeyValue {
if r != nil {
return r.meta
}
return nil
}
type ListRequest struct {
body *ListRequestBody
signature *Signature
}
func (r *ListRequest) SetBody(v *ListRequestBody) {
r.body = v
}
type ListRequestBody struct {
containerID []byte
}
func (r *ListRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
type ListResponse struct {
body *ListResponseBody
signature *Signature
}
func (r *ListResponse) GetBody() *ListResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *ListResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type ListResponseBody struct {
ids []string
}
func (r *ListResponseBody) GetIDs() []string {
if r != nil {
return r.ids
}
return nil
}
type GetSubTreeRequest struct {
body *GetSubTreeRequestBody
signature *Signature
}
func (r *GetSubTreeRequest) SetBody(v *GetSubTreeRequestBody) {
r.body = v
}
func (r *GetSubTreeRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetSubTreeRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
}
func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
}
type GetSubTreeRequestBody struct {
containerID []byte
treeID string
rootID []uint64
depth uint32
bearerToken []byte
orderBy *GetSubTreeRequestBodyOrder
}
func (r *GetSubTreeRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *GetSubTreeRequestBody) SetBearerToken(v []byte) {
r.bearerToken = v
}
func (r *GetSubTreeRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetSubTreeRequestBody) SetRootID(v []uint64) {
r.rootID = v
}
func (r *GetSubTreeRequestBody) SetDepth(v uint32) {
r.depth = v
}
func (r *GetSubTreeRequestBody) SetOrderBy(v *GetSubTreeRequestBodyOrder) {
r.orderBy = v
}
type GetSubTreeRequestBodyOrder struct {
direction GetSubTreeRequestBodyOrderDirection
}
func (r *GetSubTreeRequestBodyOrder) SetDirection(v GetSubTreeRequestBodyOrderDirection) {
r.direction = v
}
const (
GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None)
GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc)
)
type GetSubTreeRequestBodyOrderDirection int32
type GetSubTreeResponse struct {
body *GetSubTreeResponseBody
signature *Signature
}
func (r *GetSubTreeResponse) GetBody() *GetSubTreeResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetSubTreeResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetSubTreeResponseBody struct {
nodeID []uint64
parentID []uint64
timestamp []uint64
meta []*KeyValue
}
func (r *GetSubTreeResponseBody) GetNodeID() []uint64 {
if r != nil {
return r.nodeID
}
return nil
}
func (r *GetSubTreeResponseBody) GetParentID() []uint64 {
if r != nil {
return r.parentID
}
return nil
}
func (r *GetSubTreeResponseBody) GetTimestamp() []uint64 {
if r != nil {
return r.timestamp
}
return nil
}
func (r *GetSubTreeResponseBody) GetMeta() []*KeyValue {
if r != nil {
return r.meta
}
return nil
}
type ApplyRequest struct {
body *ApplyRequestBody
signature *Signature
}
func (r *ApplyRequest) SetBody(v *ApplyRequestBody) {
r.body = v
}
type ApplyRequestBody struct {
containerID []byte
treeID string
operation *LogMove
}
func (r *ApplyRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *ApplyRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *ApplyRequestBody) SetOperation(v *LogMove) {
r.operation = v
}
type ApplyResponse struct {
body *ApplyResponseBody
signature *Signature
}
func (r *ApplyResponse) GetBody() *ApplyResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *ApplyResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type ApplyResponseBody struct{}
type GetOpLogRequest struct {
body *GetOpLogRequestBody
signature *Signature
}
func (r *GetOpLogRequest) SetBody(v *GetOpLogRequestBody) {
r.body = v
}
func (r *GetOpLogRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetOpLogRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
func (r *GetOpLogRequest) SignedDataSize() int {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
}
func (r *GetOpLogRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
}
type GetOpLogRequestBody struct {
containerID []byte
treeID string
height uint64
count uint64
}
func (r *GetOpLogRequestBody) SetTreeID(v string) {
r.treeID = v
}
func (r *GetOpLogRequestBody) SetContainerID(v []byte) {
r.containerID = v
}
func (r *GetOpLogRequestBody) SetHeight(v uint64) {
r.height = v
}
func (r *GetOpLogRequestBody) SetCount(v uint64) {
r.count = v
}
type GetOpLogResponse struct {
body *GetOpLogResponseBody
signature *Signature
}
func (r *GetOpLogResponse) GetBody() *GetOpLogResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *GetOpLogResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type GetOpLogResponseBody struct {
operation *LogMove
}
func (r *GetOpLogResponseBody) GetOperation() *LogMove {
if r != nil {
return r.operation
}
return nil
}
type HealthcheckRequest struct {
body *HealthcheckRequestBody
signature *Signature
session.RequestHeaders
}
func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) {
r.body = v
}
type HealthcheckRequestBody struct{}
type HealthcheckResponse struct {
body *HealthcheckResponseBody
signature *Signature
}
func (r *HealthcheckResponse) GetBody() *HealthcheckResponseBody {
if r != nil {
return r.body
}
return nil
}
func (r *HealthcheckResponse) GetSignature() *Signature {
if r != nil {
return r.signature
}
return nil
}
type HealthcheckResponseBody struct{}
type LogMove struct {
parentID uint64
meta []byte
childID uint64
}
func (g *LogMove) GetParentID() uint64 {
if g != nil {
return g.parentID
}
return 0
}
func (g *LogMove) SetParentID(v uint64) {
g.parentID = v
}
func (g *LogMove) GetMeta() []byte {
if g != nil {
return g.meta
}
return nil
}
func (g *LogMove) SetMeta(v []byte) {
g.meta = v
}
func (g *LogMove) GetChildID() []byte {
if g != nil {
return g.meta
}
return nil
}
func (g *LogMove) SetChildID(v []byte) {
g.meta = v
}
type Signature struct {
key []byte
sign []byte
}
func (s *Signature) GetKey() []byte {
if s != nil {
return s.key
}
return nil
}
func (s *Signature) SetKey(v []byte) {
s.key = v
}
func (s *Signature) GetSign() []byte {
if s != nil {
return s.sign
}
return nil
}
func (s *Signature) SetSign(v []byte) {
s.sign = v
}
type KeyValue struct {
key string
value []byte
}
func (k *KeyValue) GetKey() string {
if k != nil {
return k.key
}
return ""
}
func (k *KeyValue) SetKey(v string) {
k.key = v
}
func (k *KeyValue) GetValue() []byte {
if k != nil {
return k.value
}
return nil
}
func (k *KeyValue) SetValue(v []byte) {
k.value = v
}

View file

@ -6,20 +6,22 @@ import (
"errors"
"fmt"
"sync"
"time"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
type treeClient struct {
mu sync.RWMutex
address string
opts []grpc.DialOption
conn *grpc.ClientConn
service grpcService.TreeServiceClient
mu sync.RWMutex
address string
opts []grpc.DialOption
client *rpcclient.Client
nodeDialTimeout time.Duration
streamTimeout time.Duration
dkirillov marked this conversation as resolved Outdated

Why do we need separate field? Is not enough having client *rpcclient.Client?

Why do we need separate field? Is not enough having `client *rpcclient.Client`?
healthy bool
}
@ -27,10 +29,12 @@ type treeClient struct {
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
// newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
return &treeClient{
address: addr,
opts: opts,
address: addr,
opts: opts,
nodeDialTimeout: dialTimeout,
streamTimeout: streamTimeout,
}
}
@ -38,16 +42,17 @@ func (c *treeClient) dial(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
if c.client != nil {
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
}
var err error
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
c.client, err = c.createClient()
if err != nil {
return err
}
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
return fmt.Errorf("healthcheck tree service: %w", err)
}
@ -60,14 +65,14 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
if c.client == nil {
if c.client, err = c.createClient(); err != nil {
return false, err
}
}
dkirillov marked this conversation as resolved
Review

I would expect checking for if c.client == nil

I would expect checking for `if c.client == nil`
wasHealthy := c.healthy
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
c.healthy = false
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
}
@ -77,39 +82,26 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
return !wasHealthy, nil
}
func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) {
host, tlsEnable, err := apiClient.ParseURI(addr)
if err != nil {
return nil, nil, fmt.Errorf("parse address: %w", err)
}
func (c *treeClient) createClient() (*rpcclient.Client, error) {
cli := rpcclient.New(append(
rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
rpcclient.WithDialTimeout(c.nodeDialTimeout),
rpcclient.WithRWTimeout(c.streamTimeout),
rpcclient.WithGRPCDialOptions(c.opts),
)...)
creds := insecure.NewCredentials()
if tlsEnable {
creds = credentials.NewTLS(&tls.Config{})
}
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
// the order is matter, we want client to be able to overwrite options.
opts := append(options, clientOptions...)
conn, err := grpc.NewClient(host, opts...)
if err != nil {
return nil, nil, fmt.Errorf("grpc create node tree service: %w", err)
}
return conn, grpcService.NewTreeServiceClient(conn), nil
return cli, nil
}
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.conn == nil || !c.healthy {
if c.client == nil || !c.healthy {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
}
return c.service, nil
return c.client, nil
}
func (c *treeClient) endpoint() string {
dkirillov marked this conversation as resolved Outdated

If we use this options, than we don't need previously defined options

If we use this options, than we don't need previously defined `options`
@ -132,9 +124,8 @@ func (c *treeClient) close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
if c.client == nil || c.client.Conn() == nil {
return nil
dkirillov marked this conversation as resolved Outdated

If we check c.client for nil then conn := c.client.Conn() above can cause panic. (Probably this will never happen but code logically incorrect)

If we check `c.client` for `nil` then `conn := c.client.Conn()` above can cause panic. (Probably this will never happen but code logically incorrect)
}
return c.conn.Close()
return c.client.Conn().Close()
}

View file

@ -10,9 +10,11 @@ import (
"sync"
"time"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -51,7 +53,7 @@ var (
// This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only.
type client interface {
serviceClient() (grpcService.TreeServiceClient, error)
serviceClient() (*rpcclient.Client, error)
endpoint() string
isHealthy() bool
setHealthy(bool)
@ -92,6 +94,7 @@ type Pool struct {
maxRequestAttempts int
streamTimeout time.Duration
nodeDialTimeout time.Duration
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error {
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
@ -336,30 +339,28 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
request := &grpcService.GetNodeByPathRequest{
Body: &grpcService.GetNodeByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
Path: prm.Path,
Attributes: prm.Meta,
PathAttribute: prm.PathAttribute,
LatestOnly: prm.LatestOnly,
AllAttributes: prm.AllAttrs,
BearerToken: prm.BearerToken,
},
}
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
body := new(tree.GetNodeByPathRequestBody)
body.SetContainerID(prm.CID)
body.SetTreeID(prm.TreeID)
body.SetPath(prm.Path)
body.SetAttributes(prm.Meta)
body.SetPathAttribute(prm.PathAttribute)
body.SetAllAttributes(prm.AllAttrs)
body.SetLatestOnly(prm.LatestOnly)
body.SetBearerToken(prm.BearerToken)
request := new(tree.GetNodeByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var resp *grpcService.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(reqCtx, request)
var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
@ -381,13 +382,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli grpcService.TreeService_GetSubTreeClient
cli *rpcapi.GetSubTreeResponseReader
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf {
resp, err := x.cli.Recv()
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return i, io.EOF
} else if err != nil {
@ -400,10 +402,11 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
}
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
var res []*grpcService.GetSubTreeResponse_Body
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
for {
resp, err := x.cli.Recv()
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
break
} else if err != nil {
@ -416,8 +419,9 @@ func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error
}
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
resp, err := x.cli.Recv()
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return nil, io.EOF
}
@ -434,32 +438,33 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
request := &grpcService.GetSubTreeRequest{
Body: &grpcService.GetSubTreeRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
RootId: prm.RootID,
Depth: prm.Depth,
BearerToken: prm.BearerToken,
OrderBy: new(grpcService.GetSubTreeRequest_Body_Order),
},
}
body := new(tree.GetSubTreeRequestBody)
body.SetContainerID(prm.CID[:])

minor: Probably we can pass prm.CID to method and inside transform to bytes

minor: Probably we can pass `prm.CID` to method and inside transform to bytes

I meant accept cid.ID as parameter type, but ok

I meant accept `cid.ID` as parameter type, but ok
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetDepth(prm.Depth)
body.SetRootID(prm.RootID)
orderBy := new(tree.GetSubTreeRequestBodyOrder)
switch prm.Order {
case AscendingOrder:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
default:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
}
body.SetOrderBy(orderBy)
request := new(tree.GetSubTreeRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return nil, err
}
var cli grpcService.TreeService_GetSubTreeClient
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
@ -476,26 +481,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
request := &grpcService.AddRequest{
Body: &grpcService.AddRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
ParentId: prm.Parent,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
}
body := new(tree.AddRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetParentID(prm.Parent)
request := new(tree.AddRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.Add(reqCtx, request)
var resp *tree.AddResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node", inErr)
})
p.methods[methodAddNode].IncRequests(time.Since(start))
@ -503,7 +506,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
return resp.GetBody().GetNodeID(), nil
}
// AddNodeByPath invokes eponymous method from TreeServiceClient.
@ -512,27 +515,25 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
request := &grpcService.AddByPathRequest{
Body: &grpcService.AddByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
Path: prm.Path,
Meta: metaToKV(prm.Meta),
PathAttribute: prm.PathAttribute,
BearerToken: prm.BearerToken,
},
}
body := new(tree.AddByPathRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetPathAttribute(prm.PathAttribute)
body.SetPath(prm.Path)
request := new(tree.AddByPathRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return 0, err
}
var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -540,15 +541,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
return 0, err
}
body := resp.GetBody()
if body == nil {
respBody := resp.GetBody()
if respBody == nil {
return 0, errors.New("nil body in tree service response")
} else if len(body.GetNodes()) == 0 {
} else if len(respBody.GetNodes()) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return body.GetNodes()[0], nil
return respBody.GetNodes()[0], nil
}
// MoveNode invokes eponymous method from TreeServiceClient.
@ -557,26 +558,24 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
request := &grpcService.MoveRequest{
Body: &grpcService.MoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
ParentId: prm.ParentID,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
}
body := new(tree.MoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetMeta(metaToKV(prm.Meta))
body.SetNodeID(prm.NodeID)
body.SetParentID(prm.ParentID)
request := new(tree.MoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to move node", err)
}
return nil
@ -592,24 +591,22 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
request := &grpcService.RemoveRequest{
Body: &grpcService.RemoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
BearerToken: prm.BearerToken,
},
}
body := new(tree.RemoveRequestBody)
body.SetTreeID(prm.TreeID)
body.SetBearerToken(prm.BearerToken)
body.SetContainerID(prm.CID[:])
body.SetNodeID(prm.NodeID)
request := new(tree.RemoveRequest)
request.SetBody(body)
start := time.Now()
if err := p.signRequest(request); err != nil {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to remove node", err)
}
return nil
@ -661,11 +658,14 @@ func handleError(msg string, err error) error {
return fmt.Errorf("%s: %w", msg, err)
}
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
result := make([]*grpcService.KeyValue, 0, len(meta))
func metaToKV(meta map[string]string) []*tree.KeyValue {
result := make([]*tree.KeyValue, 0, len(meta))
for key, value := range meta {
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
kv := new(tree.KeyValue)
kv.SetKey(key)
kv.SetValue([]byte(value))
result = append(result, kv)
}
return result
@ -814,10 +814,10 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock()
}
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)

View file

@ -9,7 +9,7 @@ type message interface {
SignedDataSize() int
ReadSignedData([]byte) ([]byte, error)
GetSignature() *tree.Signature
SetSignature(*tree.Signature)
SetSignature(*tree.Signature) error
}
// signMessage uses the pool key and signs any protobuf
@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error {
rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
m.SetSignature(&tree.Signature{
return m.SetSignature(&tree.Signature{
Key: rawPub,
Sign: data,
})
return nil
}

View file

@ -5,9 +5,9 @@ import (
"errors"
"testing"
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -17,7 +17,7 @@ type treeClientMock struct {
err bool
}
func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) {
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
if t.err {
return nil, errors.New("serviceClient() mock error")
}
@ -99,7 +99,7 @@ func TestRetry(t *testing.T) {
maxRequestAttempts: lenNodes,
}
makeFn := func(client grpcService.TreeServiceClient) error {
makeFn := func(client *rpcClient.Client) error {
return nil
}
@ -171,7 +171,7 @@ func TestRetry(t *testing.T) {
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return errNodeEmptyResult
@ -184,7 +184,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return ErrNodeNotFound
@ -197,7 +197,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
index++
return ErrNodeAccessDenied
})