[#185] Implement rpc/client for tree service #299
178
api/rpc/tree.go
Normal file
|
@ -0,0 +1,178 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||
)
|
||||
|
||||
const serviceTree = "tree.TreeService"
|
||||
|
||||
const (
|
||||
rpcTreeAdd = "Add"
|
||||
rpcTreeAddByPath = "AddByPath"
|
||||
rpcTreeRemove = "Remove"
|
||||
rpcTreeMove = "Move"
|
||||
rpcTreeGetNodeByPath = "GetNodeByPath"
|
||||
rpcTreeGetSubTree = "GetSubTree"
|
||||
rpcTreeList = "TreeList"
|
||||
rpcTreeApply = "Apply"
|
||||
rpcTreeGetOpLog = "GetOpLog"
|
||||
rpcTreeHealthcheck = "Healthcheck"
|
||||
)
|
||||
|
||||
func Add(
|
||||
cli *client.Client,
|
||||
req *tree.AddRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.AddResponse, error) {
|
||||
resp := new(tree.AddResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func AddByPath(
|
||||
cli *client.Client,
|
||||
req *tree.AddByPathRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.AddByPathResponse, error) {
|
||||
resp := new(tree.AddByPathResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func Remove(cli *client.Client,
|
||||
req *tree.RemoveRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.RemoveResponse, error) {
|
||||
resp := new(tree.RemoveResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func Move(cli *client.Client,
|
||||
req *tree.MoveRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.MoveResponse, error) {
|
||||
resp := new(tree.MoveResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func GetNodeByPath(cli *client.Client,
|
||||
req *tree.GetNodeByPathRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.GetNodeByPathResponse, error) {
|
||||
resp := new(tree.GetNodeByPathResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type GetSubTreeResponseReader struct {
|
||||
r client.MessageReader
|
||||
}
|
||||
|
||||
// Read reads response from the stream.
|
||||
//
|
||||
// Returns io.EOF of streaming is finished.
|
||||
func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error {
|
||||
return r.r.ReadMessage(resp)
|
||||
}
|
||||
|
||||
func GetSubTree(cli *client.Client,
|
||||
req *tree.GetSubTreeRequest,
|
||||
opts ...client.CallOption,
|
||||
dkirillov marked this conversation as resolved
Outdated
|
||||
) (*GetSubTreeResponseReader, error) {
|
||||
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GetSubTreeResponseReader{
|
||||
r: wc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TreeList(cli *client.Client,
|
||||
req *tree.ListRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.ListResponse, error) {
|
||||
resp := new(tree.ListResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func Apply(cli *client.Client,
|
||||
req *tree.ApplyRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.ApplyResponse, error) {
|
||||
resp := new(tree.ApplyResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type TreeServiceGetOpLogResponseReader struct {
|
||||
r client.MessageReader
|
||||
}
|
||||
|
||||
// Read reads response from the stream.
|
||||
//
|
||||
// Returns io.EOF of streaming is finished.
|
||||
func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error {
|
||||
return r.r.ReadMessage(resp)
|
||||
}
|
||||
|
||||
func GetOpLog(cli *client.Client,
|
||||
req *tree.GetOpLogRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*TreeServiceGetOpLogResponseReader, error) {
|
||||
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &TreeServiceGetOpLogResponseReader{
|
||||
r: wc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func Healthcheck(cli *client.Client,
|
||||
req *tree.HealthcheckRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*tree.HealthcheckResponse, error) {
|
||||
resp := new(tree.HealthcheckResponse)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
1591
api/tree/convert.go
Normal file
953
api/tree/types.go
Normal file
|
@ -0,0 +1,953 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||
)
|
||||
|
||||
type AddRequest struct {
|
||||
body *AddRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *AddRequest) GetBody() *AddRequestBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *AddRequest) SetBody(v *AddRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *AddRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *AddRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *AddRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
type AddRequestBody struct {
|
||||
containerID []byte
|
||||
bearerToken []byte
|
||||
treeID string
|
||||
parentID uint64
|
||||
meta []*KeyValue
|
||||
}
|
||||
|
||||
func (r *AddRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *AddRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *AddRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *AddRequestBody) SetParentID(v uint64) {
|
||||
r.parentID = v
|
||||
}
|
||||
|
||||
func (r *AddRequestBody) SetMeta(v []*KeyValue) {
|
||||
r.meta = v
|
||||
}
|
||||
|
||||
type AddResponse struct {
|
||||
body *AddResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *AddResponse) GetBody() *AddResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type AddResponseBody struct {
|
||||
nodeID uint64
|
||||
}
|
||||
|
||||
func (r *AddResponseBody) GetNodeID() uint64 {
|
||||
if r != nil {
|
||||
return r.nodeID
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
type AddByPathRequest struct {
|
||||
body *AddByPathRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *AddByPathRequest) SetBody(v *AddByPathRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *AddByPathRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil
|
||||
}
|
||||
|
||||
type AddByPathRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
pathAttribute string
|
||||
path []string
|
||||
meta []*KeyValue
|
||||
bearerToken []byte
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetPathAttribute(v string) {
|
||||
r.pathAttribute = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetPath(v []string) {
|
||||
r.path = v
|
||||
}
|
||||
|
||||
func (r *AddByPathRequestBody) SetMeta(v []*KeyValue) {
|
||||
r.meta = v
|
||||
}
|
||||
|
||||
type AddByPathResponse struct {
|
||||
body *AddByPathResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *AddByPathResponse) GetBody() *AddByPathResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *AddByPathResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type AddByPathResponseBody struct {
|
||||
nodes []uint64
|
||||
parentID uint64
|
||||
}
|
||||
|
||||
func (r *AddByPathResponseBody) GetNodes() []uint64 {
|
||||
if r != nil {
|
||||
return r.nodes
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *AddByPathResponseBody) GetParentID() uint64 {
|
||||
if r != nil {
|
||||
return r.parentID
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
type RemoveRequest struct {
|
||||
body *RemoveRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *RemoveRequest) SetBody(v *RemoveRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *RemoveRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *RemoveRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *RemoveRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil
|
||||
}
|
||||
|
||||
type RemoveRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
nodeID uint64
|
||||
bearerToken []byte
|
||||
}
|
||||
|
||||
func (r *RemoveRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *RemoveRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *RemoveRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *RemoveRequestBody) SetNodeID(v uint64) {
|
||||
r.nodeID = v
|
||||
}
|
||||
|
||||
type RemoveResponse struct {
|
||||
body *RemoveResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *RemoveResponse) GetBody() *RemoveResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RemoveResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type RemoveResponseBody struct{}
|
||||
|
||||
type MoveRequest struct {
|
||||
body *MoveRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *MoveRequest) SetBody(v *MoveRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *MoveRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *MoveRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *MoveRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
type MoveRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
parentID uint64
|
||||
nodeID uint64
|
||||
meta []*KeyValue
|
||||
bearerToken []byte
|
||||
}
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses Don't you consider to make all fields private as we do for all api-go related types? And provide Even this field (not only this, but also the rest in this file) is directly accessible from this struct instance, you don't provide any getters to make the usage safe. If a client code directly uses `Operation`, then it may get big-boom if it will access `LogMove` fields when it's actually `nil`.
Don't you consider to make all fields private as we do for all api-go related types? And provide `Get/Set` methods?
dkirillov
commented
Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new Probably we can write something like in autogenerated code: we have exported fields but also setters with checking for nil. I believe it can be useful when we create new `*Request` variable
alexvanin
commented
For the record, I (subjectively) prefer having public fields in the structs of API message definitions. However all structures in For the record, I (subjectively) prefer having public fields in the structs of API message definitions.
However all structures in `api` package defined with `Get/Set` methods, so it makes sense to keep it for tree as well.
|
||||
|
||||
func (r *MoveRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *MoveRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *MoveRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *MoveRequestBody) SetNodeID(v uint64) {
|
||||
r.nodeID = v
|
||||
}
|
||||
|
||||
func (r *MoveRequestBody) SetMeta(v []*KeyValue) {
|
||||
r.meta = v
|
||||
}
|
||||
|
||||
func (r *MoveRequestBody) SetParentID(v uint64) {
|
||||
r.parentID = v
|
||||
}
|
||||
|
||||
type MoveResponse struct {
|
||||
body *MoveResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *MoveResponse) GetBody() *MoveResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *MoveResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Why does this method return dummy value instead Why does this method return dummy value instead `nil`?
|
||||
|
||||
type MoveResponseBody struct{}
|
||||
|
||||
type GetNodeByPathRequest struct {
|
||||
body *GetNodeByPathRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequest) SetBody(v *GetNodeByPathRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
type GetNodeByPathRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
pathAttribute string
|
||||
path []string
|
||||
attributes []string
|
||||
latestOnly bool
|
||||
allAttributes bool
|
||||
bearerToken []byte
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetContainerID(v [32]byte) {
|
||||
r.containerID = v[:]
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetPathAttribute(v string) {
|
||||
r.pathAttribute = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetParentID(v string) {
|
||||
r.pathAttribute = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetPath(v []string) {
|
||||
r.path = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetAttributes(v []string) {
|
||||
r.attributes = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetAllAttributes(v bool) {
|
||||
r.allAttributes = v
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathRequestBody) SetLatestOnly(v bool) {
|
||||
r.latestOnly = v
|
||||
}
|
||||
|
||||
type GetNodeByPathResponse struct {
|
||||
body *GetNodeByPathResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponse) GetBody() *GetNodeByPathResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetNodeByPathResponseBody struct {
|
||||
nodes []*GetNodeByPathResponseInfo
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponseBody) GetNodes() []*GetNodeByPathResponseInfo {
|
||||
if r != nil {
|
||||
return r.nodes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetNodeByPathResponseInfo struct {
|
||||
nodeID uint64
|
||||
timestamp uint64
|
||||
meta []*KeyValue
|
||||
parentID uint64
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponseInfo) GetNodeID() uint64 {
|
||||
if r != nil {
|
||||
return r.nodeID
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponseInfo) GetTimestamp() uint64 {
|
||||
if r != nil {
|
||||
return r.timestamp
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponseInfo) GetParentID() uint64 {
|
||||
if r != nil {
|
||||
return r.parentID
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *GetNodeByPathResponseInfo) GetMeta() []*KeyValue {
|
||||
if r != nil {
|
||||
return r.meta
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ListRequest struct {
|
||||
body *ListRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *ListRequest) SetBody(v *ListRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
type ListRequestBody struct {
|
||||
containerID []byte
|
||||
}
|
||||
|
||||
func (r *ListRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
type ListResponse struct {
|
||||
body *ListResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *ListResponse) GetBody() *ListResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ListResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ListResponseBody struct {
|
||||
ids []string
|
||||
}
|
||||
|
||||
func (r *ListResponseBody) GetIDs() []string {
|
||||
if r != nil {
|
||||
return r.ids
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetSubTreeRequest struct {
|
||||
body *GetSubTreeRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequest) SetBody(v *GetSubTreeRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
type GetSubTreeRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
rootID []uint64
|
||||
depth uint32
|
||||
bearerToken []byte
|
||||
orderBy *GetSubTreeRequestBodyOrder
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetBearerToken(v []byte) {
|
||||
r.bearerToken = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetRootID(v []uint64) {
|
||||
r.rootID = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetDepth(v uint32) {
|
||||
r.depth = v
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBody) SetOrderBy(v *GetSubTreeRequestBodyOrder) {
|
||||
r.orderBy = v
|
||||
}
|
||||
|
||||
type GetSubTreeRequestBodyOrder struct {
|
||||
direction GetSubTreeRequestBodyOrderDirection
|
||||
}
|
||||
|
||||
func (r *GetSubTreeRequestBodyOrder) SetDirection(v GetSubTreeRequestBodyOrderDirection) {
|
||||
r.direction = v
|
||||
}
|
||||
|
||||
const (
|
||||
GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None)
|
||||
GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc)
|
||||
)
|
||||
|
||||
type GetSubTreeRequestBodyOrderDirection int32
|
||||
|
||||
type GetSubTreeResponse struct {
|
||||
body *GetSubTreeResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponse) GetBody() *GetSubTreeResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetSubTreeResponseBody struct {
|
||||
nodeID []uint64
|
||||
parentID []uint64
|
||||
timestamp []uint64
|
||||
meta []*KeyValue
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponseBody) GetNodeID() []uint64 {
|
||||
if r != nil {
|
||||
return r.nodeID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponseBody) GetParentID() []uint64 {
|
||||
if r != nil {
|
||||
return r.parentID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponseBody) GetTimestamp() []uint64 {
|
||||
if r != nil {
|
||||
return r.timestamp
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetSubTreeResponseBody) GetMeta() []*KeyValue {
|
||||
if r != nil {
|
||||
return r.meta
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ApplyRequest struct {
|
||||
body *ApplyRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *ApplyRequest) SetBody(v *ApplyRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
type ApplyRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
operation *LogMove
|
||||
}
|
||||
|
||||
func (r *ApplyRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *ApplyRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *ApplyRequestBody) SetOperation(v *LogMove) {
|
||||
r.operation = v
|
||||
}
|
||||
|
||||
type ApplyResponse struct {
|
||||
body *ApplyResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *ApplyResponse) GetBody() *ApplyResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ApplyResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ApplyResponseBody struct{}
|
||||
|
||||
type GetOpLogRequest struct {
|
||||
body *GetOpLogRequestBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequest) SetBody(v *GetOpLogRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequest) GetSignature() *tree.Signature {
|
||||
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequest) SetSignature(signature *tree.Signature) error {
|
||||
if r.signature == nil {
|
||||
r.signature = new(Signature)
|
||||
}
|
||||
|
||||
return r.signature.FromGRPCMessage(signature)
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequest) SignedDataSize() int {
|
||||
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
type GetOpLogRequestBody struct {
|
||||
containerID []byte
|
||||
treeID string
|
||||
height uint64
|
||||
count uint64
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequestBody) SetTreeID(v string) {
|
||||
r.treeID = v
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequestBody) SetContainerID(v []byte) {
|
||||
r.containerID = v
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequestBody) SetHeight(v uint64) {
|
||||
r.height = v
|
||||
}
|
||||
|
||||
func (r *GetOpLogRequestBody) SetCount(v uint64) {
|
||||
r.count = v
|
||||
}
|
||||
|
||||
type GetOpLogResponse struct {
|
||||
body *GetOpLogResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *GetOpLogResponse) GetBody() *GetOpLogResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GetOpLogResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetOpLogResponseBody struct {
|
||||
operation *LogMove
|
||||
}
|
||||
|
||||
func (r *GetOpLogResponseBody) GetOperation() *LogMove {
|
||||
if r != nil {
|
||||
return r.operation
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type HealthcheckRequest struct {
|
||||
body *HealthcheckRequestBody
|
||||
signature *Signature
|
||||
|
||||
session.RequestHeaders
|
||||
}
|
||||
|
||||
func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) {
|
||||
r.body = v
|
||||
}
|
||||
|
||||
type HealthcheckRequestBody struct{}
|
||||
|
||||
type HealthcheckResponse struct {
|
||||
body *HealthcheckResponseBody
|
||||
signature *Signature
|
||||
}
|
||||
|
||||
func (r *HealthcheckResponse) GetBody() *HealthcheckResponseBody {
|
||||
if r != nil {
|
||||
return r.body
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *HealthcheckResponse) GetSignature() *Signature {
|
||||
if r != nil {
|
||||
return r.signature
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type HealthcheckResponseBody struct{}
|
||||
|
||||
type LogMove struct {
|
||||
parentID uint64
|
||||
meta []byte
|
||||
childID uint64
|
||||
}
|
||||
|
||||
func (g *LogMove) GetParentID() uint64 {
|
||||
if g != nil {
|
||||
return g.parentID
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (g *LogMove) SetParentID(v uint64) {
|
||||
g.parentID = v
|
||||
}
|
||||
|
||||
func (g *LogMove) GetMeta() []byte {
|
||||
if g != nil {
|
||||
return g.meta
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *LogMove) SetMeta(v []byte) {
|
||||
g.meta = v
|
||||
}
|
||||
|
||||
func (g *LogMove) GetChildID() []byte {
|
||||
if g != nil {
|
||||
return g.meta
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *LogMove) SetChildID(v []byte) {
|
||||
g.meta = v
|
||||
}
|
||||
|
||||
type Signature struct {
|
||||
key []byte
|
||||
sign []byte
|
||||
}
|
||||
|
||||
func (s *Signature) GetKey() []byte {
|
||||
if s != nil {
|
||||
return s.key
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Signature) SetKey(v []byte) {
|
||||
s.key = v
|
||||
}
|
||||
|
||||
func (s *Signature) GetSign() []byte {
|
||||
if s != nil {
|
||||
return s.sign
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Signature) SetSign(v []byte) {
|
||||
s.sign = v
|
||||
}
|
||||
|
||||
type KeyValue struct {
|
||||
key string
|
||||
value []byte
|
||||
}
|
||||
|
||||
func (k *KeyValue) GetKey() string {
|
||||
if k != nil {
|
||||
return k.key
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func (k *KeyValue) SetKey(v string) {
|
||||
k.key = v
|
||||
}
|
||||
|
||||
func (k *KeyValue) GetValue() []byte {
|
||||
if k != nil {
|
||||
return k.value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KeyValue) SetValue(v []byte) {
|
||||
k.value = v
|
||||
}
|
|
@ -6,20 +6,22 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
type treeClient struct {
|
||||
mu sync.RWMutex
|
||||
address string
|
||||
opts []grpc.DialOption
|
||||
conn *grpc.ClientConn
|
||||
service grpcService.TreeServiceClient
|
||||
mu sync.RWMutex
|
||||
address string
|
||||
opts []grpc.DialOption
|
||||
client *rpcclient.Client
|
||||
nodeDialTimeout time.Duration
|
||||
streamTimeout time.Duration
|
||||
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we need separate field? Is not enough having Why do we need separate field? Is not enough having `client *rpcclient.Client`?
|
||||
healthy bool
|
||||
}
|
||||
|
||||
|
@ -27,10 +29,12 @@ type treeClient struct {
|
|||
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
||||
|
||||
// newTreeClient creates new tree client with auto dial.
|
||||
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
||||
func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
|
||||
return &treeClient{
|
||||
address: addr,
|
||||
opts: opts,
|
||||
address: addr,
|
||||
opts: opts,
|
||||
nodeDialTimeout: dialTimeout,
|
||||
streamTimeout: streamTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,16 +42,17 @@ func (c *treeClient) dial(ctx context.Context) error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
if c.client != nil {
|
||||
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
||||
}
|
||||
|
||||
var err error
|
||||
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
|
||||
c.client, err = c.createClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
||||
return fmt.Errorf("healthcheck tree service: %w", err)
|
||||
}
|
||||
|
||||
|
@ -60,14 +65,14 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
|
||||
if c.client == nil {
|
||||
if c.client, err = c.createClient(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
dkirillov marked this conversation as resolved
dkirillov
commented
I would expect checking for I would expect checking for `if c.client == nil`
|
||||
wasHealthy := c.healthy
|
||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
||||
c.healthy = false
|
||||
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
||||
}
|
||||
|
@ -77,39 +82,26 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
|||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) {
|
||||
host, tlsEnable, err := apiClient.ParseURI(addr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("parse address: %w", err)
|
||||
}
|
||||
func (c *treeClient) createClient() (*rpcclient.Client, error) {
|
||||
cli := rpcclient.New(append(
|
||||
rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
|
||||
rpcclient.WithDialTimeout(c.nodeDialTimeout),
|
||||
rpcclient.WithRWTimeout(c.streamTimeout),
|
||||
rpcclient.WithGRPCDialOptions(c.opts),
|
||||
)...)
|
||||
|
||||
creds := insecure.NewCredentials()
|
||||
if tlsEnable {
|
||||
creds = credentials.NewTLS(&tls.Config{})
|
||||
}
|
||||
|
||||
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
|
||||
|
||||
// the order is matter, we want client to be able to overwrite options.
|
||||
opts := append(options, clientOptions...)
|
||||
|
||||
conn, err := grpc.NewClient(host, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("grpc create node tree service: %w", err)
|
||||
}
|
||||
|
||||
return conn, grpcService.NewTreeServiceClient(conn), nil
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
||||
func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
if c.conn == nil || !c.healthy {
|
||||
if c.client == nil || !c.healthy {
|
||||
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
||||
}
|
||||
|
||||
return c.service, nil
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
func (c *treeClient) endpoint() string {
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If we use this options, than we don't need previously defined If we use this options, than we don't need previously defined `options`
|
||||
|
@ -132,9 +124,8 @@ func (c *treeClient) close() error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
if c.client == nil || c.client.Conn() == nil {
|
||||
return nil
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If we check If we check `c.client` for `nil` then `conn := c.client.Conn()` above can cause panic. (Probably this will never happen but code logically incorrect)
|
||||
}
|
||||
|
||||
return c.conn.Close()
|
||||
return c.client.Conn().Close()
|
||||
}
|
||||
|
|
|
@ -10,9 +10,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
@ -51,7 +53,7 @@ var (
|
|||
// This interface is expected to have exactly one production implementation - treeClient.
|
||||
// Others are expected to be for test purposes only.
|
||||
type client interface {
|
||||
serviceClient() (grpcService.TreeServiceClient, error)
|
||||
serviceClient() (*rpcclient.Client, error)
|
||||
endpoint() string
|
||||
isHealthy() bool
|
||||
setHealthy(bool)
|
||||
|
@ -92,6 +94,7 @@ type Pool struct {
|
|||
|
||||
maxRequestAttempts int
|
||||
streamTimeout time.Duration
|
||||
nodeDialTimeout time.Duration
|
||||
|
||||
startIndicesMtx sync.RWMutex
|
||||
// startIndices points to the client from which the next request will be executed.
|
||||
|
@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
for i, nodes := range p.rebalanceParams.nodesGroup {
|
||||
clients := make([]client, len(nodes))
|
||||
for j, node := range nodes {
|
||||
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
||||
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
||||
if err := clients[j].dial(ctx); err != nil {
|
||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
||||
continue
|
||||
|
@ -336,30 +339,28 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
|||
// Can return predefined errors:
|
||||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
||||
request := &grpcService.GetNodeByPathRequest{
|
||||
Body: &grpcService.GetNodeByPathRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
Path: prm.Path,
|
||||
Attributes: prm.Meta,
|
||||
PathAttribute: prm.PathAttribute,
|
||||
LatestOnly: prm.LatestOnly,
|
||||
AllAttributes: prm.AllAttrs,
|
||||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
|
||||
body := new(tree.GetNodeByPathRequestBody)
|
||||
body.SetContainerID(prm.CID)
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetPath(prm.Path)
|
||||
body.SetAttributes(prm.Meta)
|
||||
body.SetPathAttribute(prm.PathAttribute)
|
||||
body.SetAllAttributes(prm.AllAttrs)
|
||||
body.SetLatestOnly(prm.LatestOnly)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
|
||||
request := new(tree.GetNodeByPathRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp *grpcService.GetNodeByPathResponse
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
||||
defer cancel()
|
||||
resp, inErr = client.GetNodeByPath(reqCtx, request)
|
||||
var resp *tree.GetNodeByPathResponse
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
|
||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||
// Empty result is expected due to delayed tree service sync.
|
||||
// Return an error there to trigger retry and ignore it after,
|
||||
|
@ -381,13 +382,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
//
|
||||
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
||||
type SubTreeReader struct {
|
||||
cli grpcService.TreeService_GetSubTreeClient
|
||||
cli *rpcapi.GetSubTreeResponseReader
|
||||
}
|
||||
|
||||
// Read reads another list of the subtree nodes.
|
||||
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
|
||||
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
|
||||
for i := range buf {
|
||||
resp, err := x.cli.Recv()
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
if err == io.EOF {
|
||||
return i, io.EOF
|
||||
} else if err != nil {
|
||||
|
@ -400,10 +402,11 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
|
|||
}
|
||||
|
||||
// ReadAll reads all nodes subtree nodes.
|
||||
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
||||
var res []*grpcService.GetSubTreeResponse_Body
|
||||
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
|
||||
var res []*tree.GetSubTreeResponseBody
|
||||
for {
|
||||
resp, err := x.cli.Recv()
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
|
@ -416,8 +419,9 @@ func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error
|
|||
}
|
||||
|
||||
// Next gets the next node from subtree.
|
||||
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
||||
resp, err := x.cli.Recv()
|
||||
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
|
||||
var resp tree.GetSubTreeResponse
|
||||
err := x.cli.Read(&resp)
|
||||
if err == io.EOF {
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
@ -434,32 +438,33 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
||||
request := &grpcService.GetSubTreeRequest{
|
||||
Body: &grpcService.GetSubTreeRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
RootId: prm.RootID,
|
||||
Depth: prm.Depth,
|
||||
BearerToken: prm.BearerToken,
|
||||
OrderBy: new(grpcService.GetSubTreeRequest_Body_Order),
|
||||
},
|
||||
}
|
||||
body := new(tree.GetSubTreeRequestBody)
|
||||
body.SetContainerID(prm.CID[:])
|
||||
dkirillov
commented
minor: Probably we can pass minor: Probably we can pass `prm.CID` to method and inside transform to bytes
dkirillov
commented
I meant accept I meant accept `cid.ID` as parameter type, but ok
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
body.SetDepth(prm.Depth)
|
||||
body.SetRootID(prm.RootID)
|
||||
|
||||
orderBy := new(tree.GetSubTreeRequestBodyOrder)
|
||||
switch prm.Order {
|
||||
case AscendingOrder:
|
||||
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc
|
||||
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
|
||||
default:
|
||||
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
|
||||
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
|
||||
}
|
||||
body.SetOrderBy(orderBy)
|
||||
|
||||
request := new(tree.GetSubTreeRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cli grpcService.TreeService_GetSubTreeClient
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
cli, inErr = client.GetSubTree(ctx, request)
|
||||
var cli *rpcapi.GetSubTreeResponseReader
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
})
|
||||
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
||||
|
@ -476,26 +481,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||
request := &grpcService.AddRequest{
|
||||
Body: &grpcService.AddRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
ParentId: prm.Parent,
|
||||
Meta: metaToKV(prm.Meta),
|
||||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
body := new(tree.AddRequestBody)
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
body.SetContainerID(prm.CID[:])
|
||||
body.SetMeta(metaToKV(prm.Meta))
|
||||
body.SetParentID(prm.Parent)
|
||||
|
||||
request := new(tree.AddRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var resp *grpcService.AddResponse
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
||||
defer cancel()
|
||||
resp, inErr = client.Add(reqCtx, request)
|
||||
var resp *tree.AddResponse
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
|
||||
return handleError("failed to add node", inErr)
|
||||
})
|
||||
p.methods[methodAddNode].IncRequests(time.Since(start))
|
||||
|
@ -503,7 +506,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
return resp.GetBody().GetNodeId(), nil
|
||||
return resp.GetBody().GetNodeID(), nil
|
||||
}
|
||||
|
||||
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
||||
|
@ -512,27 +515,25 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
||||
request := &grpcService.AddByPathRequest{
|
||||
Body: &grpcService.AddByPathRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
Path: prm.Path,
|
||||
Meta: metaToKV(prm.Meta),
|
||||
PathAttribute: prm.PathAttribute,
|
||||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
body := new(tree.AddByPathRequestBody)
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
body.SetContainerID(prm.CID[:])
|
||||
body.SetMeta(metaToKV(prm.Meta))
|
||||
body.SetPathAttribute(prm.PathAttribute)
|
||||
body.SetPath(prm.Path)
|
||||
|
||||
request := new(tree.AddByPathRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var resp *grpcService.AddByPathResponse
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
||||
defer cancel()
|
||||
resp, inErr = client.AddByPath(reqCtx, request)
|
||||
var resp *tree.AddByPathResponse
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
|
||||
return handleError("failed to add node by path", inErr)
|
||||
})
|
||||
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
||||
|
@ -540,15 +541,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
return 0, err
|
||||
}
|
||||
|
||||
body := resp.GetBody()
|
||||
if body == nil {
|
||||
respBody := resp.GetBody()
|
||||
if respBody == nil {
|
||||
return 0, errors.New("nil body in tree service response")
|
||||
} else if len(body.GetNodes()) == 0 {
|
||||
} else if len(respBody.GetNodes()) == 0 {
|
||||
return 0, errors.New("empty list of added nodes in tree service response")
|
||||
}
|
||||
|
||||
// The first node is the leaf that we add, according to tree service docs.
|
||||
return body.GetNodes()[0], nil
|
||||
return respBody.GetNodes()[0], nil
|
||||
}
|
||||
|
||||
// MoveNode invokes eponymous method from TreeServiceClient.
|
||||
|
@ -557,26 +558,24 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||
request := &grpcService.MoveRequest{
|
||||
Body: &grpcService.MoveRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
NodeId: prm.NodeID,
|
||||
ParentId: prm.ParentID,
|
||||
Meta: metaToKV(prm.Meta),
|
||||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
body := new(tree.MoveRequestBody)
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
body.SetContainerID(prm.CID[:])
|
||||
body.SetMeta(metaToKV(prm.Meta))
|
||||
body.SetNodeID(prm.NodeID)
|
||||
body.SetParentID(prm.ParentID)
|
||||
|
||||
request := new(tree.MoveRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
||||
defer cancel()
|
||||
if _, err := client.Move(reqCtx, request); err != nil {
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
||||
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -592,24 +591,22 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||
request := &grpcService.RemoveRequest{
|
||||
Body: &grpcService.RemoveRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
TreeId: prm.TreeID,
|
||||
NodeId: prm.NodeID,
|
||||
BearerToken: prm.BearerToken,
|
||||
},
|
||||
}
|
||||
body := new(tree.RemoveRequestBody)
|
||||
body.SetTreeID(prm.TreeID)
|
||||
body.SetBearerToken(prm.BearerToken)
|
||||
body.SetContainerID(prm.CID[:])
|
||||
body.SetNodeID(prm.NodeID)
|
||||
|
||||
request := new(tree.RemoveRequest)
|
||||
request.SetBody(body)
|
||||
|
||||
start := time.Now()
|
||||
if err := p.signRequest(request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
||||
defer cancel()
|
||||
if _, err := client.Remove(reqCtx, request); err != nil {
|
||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
||||
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -661,11 +658,14 @@ func handleError(msg string, err error) error {
|
|||
return fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
||||
result := make([]*grpcService.KeyValue, 0, len(meta))
|
||||
func metaToKV(meta map[string]string) []*tree.KeyValue {
|
||||
result := make([]*tree.KeyValue, 0, len(meta))
|
||||
|
||||
for key, value := range meta {
|
||||
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
||||
kv := new(tree.KeyValue)
|
||||
kv.SetKey(key)
|
||||
kv.SetValue([]byte(value))
|
||||
result = append(result, kv)
|
||||
}
|
||||
|
||||
return result
|
||||
|
@ -814,10 +814,10 @@ func (p *Pool) setStartIndices(i, j int) {
|
|||
p.startIndicesMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
|
||||
var (
|
||||
err, finErr error
|
||||
cl grpcService.TreeServiceClient
|
||||
cl *rpcclient.Client
|
||||
)
|
||||
|
||||
reqID := GetRequestID(ctx)
|
||||
|
|
|
@ -9,7 +9,7 @@ type message interface {
|
|||
SignedDataSize() int
|
||||
ReadSignedData([]byte) ([]byte, error)
|
||||
GetSignature() *tree.Signature
|
||||
SetSignature(*tree.Signature)
|
||||
SetSignature(*tree.Signature) error
|
||||
}
|
||||
|
||||
// signMessage uses the pool key and signs any protobuf
|
||||
|
@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error {
|
|||
|
||||
rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
|
||||
rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
|
||||
m.SetSignature(&tree.Signature{
|
||||
return m.SetSignature(&tree.Signature{
|
||||
Key: rawPub,
|
||||
Sign: data,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"errors"
|
||||
"testing"
|
||||
|
||||
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
@ -17,7 +17,7 @@ type treeClientMock struct {
|
|||
err bool
|
||||
}
|
||||
|
||||
func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) {
|
||||
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
|
||||
if t.err {
|
||||
return nil, errors.New("serviceClient() mock error")
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func TestRetry(t *testing.T) {
|
|||
maxRequestAttempts: lenNodes,
|
||||
}
|
||||
|
||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||
makeFn := func(client *rpcClient.Client) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ func TestRetry(t *testing.T) {
|
|||
|
||||
t.Run("error empty result", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return errNodeEmptyResult
|
||||
|
@ -184,7 +184,7 @@ func TestRetry(t *testing.T) {
|
|||
|
||||
t.Run("error not found", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return ErrNodeNotFound
|
||||
|
@ -197,7 +197,7 @@ func TestRetry(t *testing.T) {
|
|||
|
||||
t.Run("error access denied", func(t *testing.T) {
|
||||
var index int
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||
index++
|
||||
return ErrNodeAccessDenied
|
||||
})
|
||||
|
|
Can we use the same format across all file?
I mean write
instead of