forked from TrueCloudLab/frostfs-sdk-go
[#185] Implement rpc/client for tree service
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
afdc2d8340
commit
f34736b59a
7 changed files with 2876 additions and 164 deletions
178
api/rpc/tree.go
Normal file
178
api/rpc/tree.go
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||||
|
)
|
||||||
|
|
||||||
|
const serviceTree = "tree.TreeService"
|
||||||
|
|
||||||
|
const (
|
||||||
|
rpcTreeAdd = "Add"
|
||||||
|
rpcTreeAddByPath = "AddByPath"
|
||||||
|
rpcTreeRemove = "Remove"
|
||||||
|
rpcTreeMove = "Move"
|
||||||
|
rpcTreeGetNodeByPath = "GetNodeByPath"
|
||||||
|
rpcTreeGetSubTree = "GetSubTree"
|
||||||
|
rpcTreeList = "TreeList"
|
||||||
|
rpcTreeApply = "Apply"
|
||||||
|
rpcTreeGetOpLog = "GetOpLog"
|
||||||
|
rpcTreeHealthcheck = "Healthcheck"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Add(
|
||||||
|
cli *client.Client,
|
||||||
|
req *tree.AddRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.AddResponse, error) {
|
||||||
|
resp := new(tree.AddResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddByPath(
|
||||||
|
cli *client.Client,
|
||||||
|
req *tree.AddByPathRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.AddByPathResponse, error) {
|
||||||
|
resp := new(tree.AddByPathResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Remove(cli *client.Client,
|
||||||
|
req *tree.RemoveRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.RemoveResponse, error) {
|
||||||
|
resp := new(tree.RemoveResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Move(cli *client.Client,
|
||||||
|
req *tree.MoveRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.MoveResponse, error) {
|
||||||
|
resp := new(tree.MoveResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetNodeByPath(cli *client.Client,
|
||||||
|
req *tree.GetNodeByPathRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.GetNodeByPathResponse, error) {
|
||||||
|
resp := new(tree.GetNodeByPathResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeResponseReader struct {
|
||||||
|
r client.MessageReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads response from the stream.
|
||||||
|
//
|
||||||
|
// Returns io.EOF of streaming is finished.
|
||||||
|
func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error {
|
||||||
|
return r.r.ReadMessage(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSubTree(cli *client.Client,
|
||||||
|
req *tree.GetSubTreeRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*GetSubTreeResponseReader, error) {
|
||||||
|
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &GetSubTreeResponseReader{
|
||||||
|
r: wc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TreeList(cli *client.Client,
|
||||||
|
req *tree.ListRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.ListResponse, error) {
|
||||||
|
resp := new(tree.ListResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Apply(cli *client.Client,
|
||||||
|
req *tree.ApplyRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.ApplyResponse, error) {
|
||||||
|
resp := new(tree.ApplyResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type TreeServiceGetOpLogResponseReader struct {
|
||||||
|
r client.MessageReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads response from the stream.
|
||||||
|
//
|
||||||
|
// Returns io.EOF of streaming is finished.
|
||||||
|
func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error {
|
||||||
|
return r.r.ReadMessage(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetOpLog(cli *client.Client,
|
||||||
|
req *tree.GetOpLogRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*TreeServiceGetOpLogResponseReader, error) {
|
||||||
|
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TreeServiceGetOpLogResponseReader{
|
||||||
|
r: wc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Healthcheck(cli *client.Client,
|
||||||
|
req *tree.HealthcheckRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*tree.HealthcheckResponse, error) {
|
||||||
|
resp := new(tree.HealthcheckResponse)
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
1591
api/tree/convert.go
Normal file
1591
api/tree/convert.go
Normal file
File diff suppressed because it is too large
Load diff
953
api/tree/types.go
Normal file
953
api/tree/types.go
Normal file
|
@ -0,0 +1,953 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||||
|
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AddRequest struct {
|
||||||
|
body *AddRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) GetBody() *AddRequestBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) SetBody(v *AddRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
bearerToken []byte
|
||||||
|
treeID string
|
||||||
|
parentID uint64
|
||||||
|
meta []*KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequestBody) SetParentID(v uint64) {
|
||||||
|
r.parentID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddRequestBody) SetMeta(v []*KeyValue) {
|
||||||
|
r.meta = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddResponse struct {
|
||||||
|
body *AddResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddResponse) GetBody() *AddResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddResponseBody struct {
|
||||||
|
nodeID uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddResponseBody) GetNodeID() uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.nodeID
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddByPathRequest struct {
|
||||||
|
body *AddByPathRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequest) SetBody(v *AddByPathRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddByPathRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
pathAttribute string
|
||||||
|
path []string
|
||||||
|
meta []*KeyValue
|
||||||
|
bearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetPathAttribute(v string) {
|
||||||
|
r.pathAttribute = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetPath(v []string) {
|
||||||
|
r.path = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathRequestBody) SetMeta(v []*KeyValue) {
|
||||||
|
r.meta = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddByPathResponse struct {
|
||||||
|
body *AddByPathResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathResponse) GetBody() *AddByPathResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddByPathResponseBody struct {
|
||||||
|
nodes []uint64
|
||||||
|
parentID uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathResponseBody) GetNodes() []uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AddByPathResponseBody) GetParentID() uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.parentID
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoveRequest struct {
|
||||||
|
body *RemoveRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequest) SetBody(v *RemoveRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoveRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
nodeID uint64
|
||||||
|
bearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveRequestBody) SetNodeID(v uint64) {
|
||||||
|
r.nodeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoveResponse struct {
|
||||||
|
body *RemoveResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveResponse) GetBody() *RemoveResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RemoveResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoveResponseBody struct{}
|
||||||
|
|
||||||
|
type MoveRequest struct {
|
||||||
|
body *MoveRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequest) SetBody(v *MoveRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type MoveRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
parentID uint64
|
||||||
|
nodeID uint64
|
||||||
|
meta []*KeyValue
|
||||||
|
bearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetNodeID(v uint64) {
|
||||||
|
r.nodeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetMeta(v []*KeyValue) {
|
||||||
|
r.meta = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveRequestBody) SetParentID(v uint64) {
|
||||||
|
r.parentID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type MoveResponse struct {
|
||||||
|
body *MoveResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveResponse) GetBody() *MoveResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MoveResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type MoveResponseBody struct{}
|
||||||
|
|
||||||
|
type GetNodeByPathRequest struct {
|
||||||
|
body *GetNodeByPathRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequest) SetBody(v *GetNodeByPathRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNodeByPathRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
pathAttribute string
|
||||||
|
path []string
|
||||||
|
attributes []string
|
||||||
|
latestOnly bool
|
||||||
|
allAttributes bool
|
||||||
|
bearerToken []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetPathAttribute(v string) {
|
||||||
|
r.pathAttribute = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetParentID(v string) {
|
||||||
|
r.pathAttribute = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetPath(v []string) {
|
||||||
|
r.path = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetAttributes(v []string) {
|
||||||
|
r.attributes = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetAllAttributes(v bool) {
|
||||||
|
r.allAttributes = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathRequestBody) SetLatestOnly(v bool) {
|
||||||
|
r.latestOnly = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNodeByPathResponse struct {
|
||||||
|
body *GetNodeByPathResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponse) GetBody() *GetNodeByPathResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNodeByPathResponseBody struct {
|
||||||
|
nodes []*GetNodeByPathResponseInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponseBody) GetNodes() []*GetNodeByPathResponseInfo {
|
||||||
|
if r != nil {
|
||||||
|
return r.nodes
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNodeByPathResponseInfo struct {
|
||||||
|
nodeID uint64
|
||||||
|
timestamp uint64
|
||||||
|
meta []*KeyValue
|
||||||
|
parentID uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponseInfo) GetNodeID() uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.nodeID
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponseInfo) GetTimestamp() uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponseInfo) GetParentID() uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.parentID
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetNodeByPathResponseInfo) GetMeta() []*KeyValue {
|
||||||
|
if r != nil {
|
||||||
|
return r.meta
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListRequest struct {
|
||||||
|
body *ListRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ListRequest) SetBody(v *ListRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ListRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListResponse struct {
|
||||||
|
body *ListResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ListResponse) GetBody() *ListResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ListResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListResponseBody struct {
|
||||||
|
ids []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ListResponseBody) GetIDs() []string {
|
||||||
|
if r != nil {
|
||||||
|
return r.ids
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeRequest struct {
|
||||||
|
body *GetSubTreeRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequest) SetBody(v *GetSubTreeRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
rootID []uint64
|
||||||
|
depth uint32
|
||||||
|
bearerToken []byte
|
||||||
|
orderBy *GetSubTreeRequestBodyOrder
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetBearerToken(v []byte) {
|
||||||
|
r.bearerToken = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetRootID(v []uint64) {
|
||||||
|
r.rootID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetDepth(v uint32) {
|
||||||
|
r.depth = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBody) SetOrderBy(v *GetSubTreeRequestBodyOrder) {
|
||||||
|
r.orderBy = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeRequestBodyOrder struct {
|
||||||
|
direction GetSubTreeRequestBodyOrderDirection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeRequestBodyOrder) SetDirection(v GetSubTreeRequestBodyOrderDirection) {
|
||||||
|
r.direction = v
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None)
|
||||||
|
GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc)
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetSubTreeRequestBodyOrderDirection int32
|
||||||
|
|
||||||
|
type GetSubTreeResponse struct {
|
||||||
|
body *GetSubTreeResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponse) GetBody() *GetSubTreeResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetSubTreeResponseBody struct {
|
||||||
|
nodeID []uint64
|
||||||
|
parentID []uint64
|
||||||
|
timestamp []uint64
|
||||||
|
meta []*KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponseBody) GetNodeID() []uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.nodeID
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponseBody) GetParentID() []uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.parentID
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponseBody) GetTimestamp() []uint64 {
|
||||||
|
if r != nil {
|
||||||
|
return r.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetSubTreeResponseBody) GetMeta() []*KeyValue {
|
||||||
|
if r != nil {
|
||||||
|
return r.meta
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ApplyRequest struct {
|
||||||
|
body *ApplyRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyRequest) SetBody(v *ApplyRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type ApplyRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
operation *LogMove
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyRequestBody) SetOperation(v *LogMove) {
|
||||||
|
r.operation = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type ApplyResponse struct {
|
||||||
|
body *ApplyResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyResponse) GetBody() *ApplyResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ApplyResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ApplyResponseBody struct{}
|
||||||
|
|
||||||
|
type GetOpLogRequest struct {
|
||||||
|
body *GetOpLogRequestBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequest) SetBody(v *GetOpLogRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequest) GetSignature() *tree.Signature {
|
||||||
|
return r.signature.ToGRPCMessage().(*tree.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequest) SetSignature(signature *tree.Signature) error {
|
||||||
|
if r.signature == nil {
|
||||||
|
r.signature = new(Signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.signature.FromGRPCMessage(signature)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequest) SignedDataSize() int {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return r.body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetOpLogRequestBody struct {
|
||||||
|
containerID []byte
|
||||||
|
treeID string
|
||||||
|
height uint64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequestBody) SetTreeID(v string) {
|
||||||
|
r.treeID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequestBody) SetContainerID(v []byte) {
|
||||||
|
r.containerID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequestBody) SetHeight(v uint64) {
|
||||||
|
r.height = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogRequestBody) SetCount(v uint64) {
|
||||||
|
r.count = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetOpLogResponse struct {
|
||||||
|
body *GetOpLogResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogResponse) GetBody() *GetOpLogResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetOpLogResponseBody struct {
|
||||||
|
operation *LogMove
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GetOpLogResponseBody) GetOperation() *LogMove {
|
||||||
|
if r != nil {
|
||||||
|
return r.operation
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HealthcheckRequest struct {
|
||||||
|
body *HealthcheckRequestBody
|
||||||
|
signature *Signature
|
||||||
|
|
||||||
|
session.RequestHeaders
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) {
|
||||||
|
r.body = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type HealthcheckRequestBody struct{}
|
||||||
|
|
||||||
|
type HealthcheckResponse struct {
|
||||||
|
body *HealthcheckResponseBody
|
||||||
|
signature *Signature
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HealthcheckResponse) GetBody() *HealthcheckResponseBody {
|
||||||
|
if r != nil {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HealthcheckResponse) GetSignature() *Signature {
|
||||||
|
if r != nil {
|
||||||
|
return r.signature
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HealthcheckResponseBody struct{}
|
||||||
|
|
||||||
|
type LogMove struct {
|
||||||
|
parentID uint64
|
||||||
|
meta []byte
|
||||||
|
childID uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) GetParentID() uint64 {
|
||||||
|
if g != nil {
|
||||||
|
return g.parentID
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) SetParentID(v uint64) {
|
||||||
|
g.parentID = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) GetMeta() []byte {
|
||||||
|
if g != nil {
|
||||||
|
return g.meta
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) SetMeta(v []byte) {
|
||||||
|
g.meta = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) GetChildID() []byte {
|
||||||
|
if g != nil {
|
||||||
|
return g.meta
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *LogMove) SetChildID(v []byte) {
|
||||||
|
g.meta = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type Signature struct {
|
||||||
|
key []byte
|
||||||
|
sign []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Signature) GetKey() []byte {
|
||||||
|
if s != nil {
|
||||||
|
return s.key
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Signature) SetKey(v []byte) {
|
||||||
|
s.key = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Signature) GetSign() []byte {
|
||||||
|
if s != nil {
|
||||||
|
return s.sign
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Signature) SetSign(v []byte) {
|
||||||
|
s.sign = v
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyValue struct {
|
||||||
|
key string
|
||||||
|
value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyValue) GetKey() string {
|
||||||
|
if k != nil {
|
||||||
|
return k.key
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyValue) SetKey(v string) {
|
||||||
|
k.key = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyValue) GetValue() []byte {
|
||||||
|
if k != nil {
|
||||||
|
return k.value
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KeyValue) SetValue(v []byte) {
|
||||||
|
k.value = v
|
||||||
|
}
|
|
@ -6,20 +6,22 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type treeClient struct {
|
type treeClient struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
address string
|
address string
|
||||||
opts []grpc.DialOption
|
opts []grpc.DialOption
|
||||||
conn *grpc.ClientConn
|
client *rpcclient.Client
|
||||||
service grpcService.TreeServiceClient
|
nodeDialTimeout time.Duration
|
||||||
|
streamTimeout time.Duration
|
||||||
|
|
||||||
healthy bool
|
healthy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,10 +29,12 @@ type treeClient struct {
|
||||||
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
||||||
|
|
||||||
// newTreeClient creates new tree client with auto dial.
|
// newTreeClient creates new tree client with auto dial.
|
||||||
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
|
func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
|
||||||
return &treeClient{
|
return &treeClient{
|
||||||
address: addr,
|
address: addr,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
|
nodeDialTimeout: dialTimeout,
|
||||||
|
streamTimeout: streamTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,16 +42,17 @@ func (c *treeClient) dial(ctx context.Context) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.conn != nil {
|
if c.client != nil {
|
||||||
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
|
c.client, err = c.createClient()
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
||||||
return fmt.Errorf("healthcheck tree service: %w", err)
|
return fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,14 +65,14 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.conn == nil {
|
if c.client == nil {
|
||||||
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
|
if c.client, err = c.createClient(); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wasHealthy := c.healthy
|
wasHealthy := c.healthy
|
||||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
||||||
c.healthy = false
|
c.healthy = false
|
||||||
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -77,39 +82,26 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
||||||
return !wasHealthy, nil
|
return !wasHealthy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) {
|
func (c *treeClient) createClient() (*rpcclient.Client, error) {
|
||||||
host, tlsEnable, err := apiClient.ParseURI(addr)
|
cli := rpcclient.New(append(
|
||||||
if err != nil {
|
rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
|
||||||
return nil, nil, fmt.Errorf("parse address: %w", err)
|
rpcclient.WithDialTimeout(c.nodeDialTimeout),
|
||||||
|
rpcclient.WithRWTimeout(c.streamTimeout),
|
||||||
|
rpcclient.WithGRPCDialOptions(c.opts),
|
||||||
|
)...)
|
||||||
|
|
||||||
|
return cli, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
creds := insecure.NewCredentials()
|
func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
|
||||||
if tlsEnable {
|
|
||||||
creds = credentials.NewTLS(&tls.Config{})
|
|
||||||
}
|
|
||||||
|
|
||||||
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
|
|
||||||
|
|
||||||
// the order is matter, we want client to be able to overwrite options.
|
|
||||||
opts := append(options, clientOptions...)
|
|
||||||
|
|
||||||
conn, err := grpc.NewClient(host, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("grpc create node tree service: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn, grpcService.NewTreeServiceClient(conn), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
|
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
if c.conn == nil || !c.healthy {
|
if c.client == nil || !c.healthy {
|
||||||
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service, nil
|
return c.client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *treeClient) endpoint() string {
|
func (c *treeClient) endpoint() string {
|
||||||
|
@ -132,9 +124,9 @@ func (c *treeClient) close() error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.conn == nil {
|
conn := c.client.Conn()
|
||||||
|
if c.client == nil || conn == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return conn.Close()
|
||||||
return c.conn.Close()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,9 +10,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
||||||
|
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
@ -51,7 +53,7 @@ var (
|
||||||
// This interface is expected to have exactly one production implementation - treeClient.
|
// This interface is expected to have exactly one production implementation - treeClient.
|
||||||
// Others are expected to be for test purposes only.
|
// Others are expected to be for test purposes only.
|
||||||
type client interface {
|
type client interface {
|
||||||
serviceClient() (grpcService.TreeServiceClient, error)
|
serviceClient() (*rpcclient.Client, error)
|
||||||
endpoint() string
|
endpoint() string
|
||||||
isHealthy() bool
|
isHealthy() bool
|
||||||
setHealthy(bool)
|
setHealthy(bool)
|
||||||
|
@ -92,6 +94,7 @@ type Pool struct {
|
||||||
|
|
||||||
maxRequestAttempts int
|
maxRequestAttempts int
|
||||||
streamTimeout time.Duration
|
streamTimeout time.Duration
|
||||||
|
nodeDialTimeout time.Duration
|
||||||
|
|
||||||
startIndicesMtx sync.RWMutex
|
startIndicesMtx sync.RWMutex
|
||||||
// startIndices points to the client from which the next request will be executed.
|
// startIndices points to the client from which the next request will be executed.
|
||||||
|
@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
for i, nodes := range p.rebalanceParams.nodesGroup {
|
for i, nodes := range p.rebalanceParams.nodesGroup {
|
||||||
clients := make([]client, len(nodes))
|
clients := make([]client, len(nodes))
|
||||||
for j, node := range nodes {
|
for j, node := range nodes {
|
||||||
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
|
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
|
@ -336,30 +339,28 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
||||||
// Can return predefined errors:
|
// Can return predefined errors:
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNodeByPathResponseInfo, error) {
|
||||||
request := &grpcService.GetNodeByPathRequest{
|
body := new(tree.GetNodeByPathRequestBody)
|
||||||
Body: &grpcService.GetNodeByPathRequest_Body{
|
body.SetContainerID(prm.CID[:])
|
||||||
ContainerId: prm.CID[:],
|
body.SetTreeID(prm.TreeID)
|
||||||
TreeId: prm.TreeID,
|
body.SetPath(prm.Path)
|
||||||
Path: prm.Path,
|
body.SetAttributes(prm.Meta)
|
||||||
Attributes: prm.Meta,
|
body.SetPathAttribute(prm.PathAttribute)
|
||||||
PathAttribute: prm.PathAttribute,
|
body.SetAllAttributes(prm.AllAttrs)
|
||||||
LatestOnly: prm.LatestOnly,
|
body.SetLatestOnly(prm.LatestOnly)
|
||||||
AllAttributes: prm.AllAttrs,
|
body.SetBearerToken(prm.BearerToken)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
},
|
request := new(tree.GetNodeByPathRequest)
|
||||||
}
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
var resp *tree.GetNodeByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
|
||||||
defer cancel()
|
|
||||||
resp, inErr = client.GetNodeByPath(reqCtx, request)
|
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
// Return an error there to trigger retry and ignore it after,
|
// Return an error there to trigger retry and ignore it after,
|
||||||
|
@ -381,13 +382,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
||||||
//
|
//
|
||||||
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
|
||||||
type SubTreeReader struct {
|
type SubTreeReader struct {
|
||||||
cli grpcService.TreeService_GetSubTreeClient
|
cli *rpcapi.GetSubTreeResponseReader
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads another list of the subtree nodes.
|
// Read reads another list of the subtree nodes.
|
||||||
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
|
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
|
||||||
for i := range buf {
|
for i := range buf {
|
||||||
resp, err := x.cli.Recv()
|
var resp tree.GetSubTreeResponse
|
||||||
|
err := x.cli.Read(&resp)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return i, io.EOF
|
return i, io.EOF
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
@ -400,10 +402,11 @@ func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, e
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadAll reads all nodes subtree nodes.
|
// ReadAll reads all nodes subtree nodes.
|
||||||
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
|
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
|
||||||
var res []*grpcService.GetSubTreeResponse_Body
|
var res []*tree.GetSubTreeResponseBody
|
||||||
for {
|
for {
|
||||||
resp, err := x.cli.Recv()
|
var resp tree.GetSubTreeResponse
|
||||||
|
err := x.cli.Read(&resp)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
@ -416,8 +419,9 @@ func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next gets the next node from subtree.
|
// Next gets the next node from subtree.
|
||||||
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
|
||||||
resp, err := x.cli.Recv()
|
var resp tree.GetSubTreeResponse
|
||||||
|
err := x.cli.Read(&resp)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
@ -434,32 +438,33 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
|
||||||
request := &grpcService.GetSubTreeRequest{
|
body := new(tree.GetSubTreeRequestBody)
|
||||||
Body: &grpcService.GetSubTreeRequest_Body{
|
body.SetContainerID(prm.CID[:])
|
||||||
ContainerId: prm.CID[:],
|
body.SetTreeID(prm.TreeID)
|
||||||
TreeId: prm.TreeID,
|
body.SetBearerToken(prm.BearerToken)
|
||||||
RootId: prm.RootID,
|
body.SetDepth(prm.Depth)
|
||||||
Depth: prm.Depth,
|
body.SetRootID(prm.RootID)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
OrderBy: new(grpcService.GetSubTreeRequest_Body_Order),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
orderBy := new(tree.GetSubTreeRequestBodyOrder)
|
||||||
switch prm.Order {
|
switch prm.Order {
|
||||||
case AscendingOrder:
|
case AscendingOrder:
|
||||||
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc
|
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderAsc)
|
||||||
default:
|
default:
|
||||||
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
|
orderBy.SetDirection(tree.GetSubTreeRequestBodyOrderNone)
|
||||||
}
|
}
|
||||||
|
body.SetOrderBy(orderBy)
|
||||||
|
|
||||||
|
request := new(tree.GetSubTreeRequest)
|
||||||
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
var cli *rpcapi.GetSubTreeResponseReader
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
})
|
})
|
||||||
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
p.methods[methodGetSubTree].IncRequests(time.Since(start))
|
||||||
|
@ -476,26 +481,24 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
request := &grpcService.AddRequest{
|
body := new(tree.AddRequestBody)
|
||||||
Body: &grpcService.AddRequest_Body{
|
body.SetTreeID(prm.TreeID)
|
||||||
ContainerId: prm.CID[:],
|
body.SetBearerToken(prm.BearerToken)
|
||||||
TreeId: prm.TreeID,
|
body.SetContainerID(prm.CID[:])
|
||||||
ParentId: prm.Parent,
|
body.SetMeta(metaToKV(prm.Meta))
|
||||||
Meta: metaToKV(prm.Meta),
|
body.SetParentID(prm.Parent)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
},
|
request := new(tree.AddRequest)
|
||||||
}
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
var resp *tree.AddResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
|
||||||
defer cancel()
|
|
||||||
resp, inErr = client.Add(reqCtx, request)
|
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
})
|
})
|
||||||
p.methods[methodAddNode].IncRequests(time.Since(start))
|
p.methods[methodAddNode].IncRequests(time.Since(start))
|
||||||
|
@ -503,7 +506,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.GetBody().GetNodeId(), nil
|
return resp.GetBody().GetNodeID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
// AddNodeByPath invokes eponymous method from TreeServiceClient.
|
||||||
|
@ -512,27 +515,25 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
||||||
request := &grpcService.AddByPathRequest{
|
body := new(tree.AddByPathRequestBody)
|
||||||
Body: &grpcService.AddByPathRequest_Body{
|
body.SetTreeID(prm.TreeID)
|
||||||
ContainerId: prm.CID[:],
|
body.SetBearerToken(prm.BearerToken)
|
||||||
TreeId: prm.TreeID,
|
body.SetContainerID(prm.CID[:])
|
||||||
Path: prm.Path,
|
body.SetMeta(metaToKV(prm.Meta))
|
||||||
Meta: metaToKV(prm.Meta),
|
body.SetPathAttribute(prm.PathAttribute)
|
||||||
PathAttribute: prm.PathAttribute,
|
body.SetPath(prm.Path)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
},
|
request := new(tree.AddByPathRequest)
|
||||||
}
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
var resp *tree.AddByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
|
||||||
defer cancel()
|
|
||||||
resp, inErr = client.AddByPath(reqCtx, request)
|
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
})
|
})
|
||||||
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
||||||
|
@ -540,15 +541,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
body := resp.GetBody()
|
respBody := resp.GetBody()
|
||||||
if body == nil {
|
if respBody == nil {
|
||||||
return 0, errors.New("nil body in tree service response")
|
return 0, errors.New("nil body in tree service response")
|
||||||
} else if len(body.GetNodes()) == 0 {
|
} else if len(respBody.GetNodes()) == 0 {
|
||||||
return 0, errors.New("empty list of added nodes in tree service response")
|
return 0, errors.New("empty list of added nodes in tree service response")
|
||||||
}
|
}
|
||||||
|
|
||||||
// The first node is the leaf that we add, according to tree service docs.
|
// The first node is the leaf that we add, according to tree service docs.
|
||||||
return body.GetNodes()[0], nil
|
return respBody.GetNodes()[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoveNode invokes eponymous method from TreeServiceClient.
|
// MoveNode invokes eponymous method from TreeServiceClient.
|
||||||
|
@ -557,26 +558,24 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
request := &grpcService.MoveRequest{
|
body := new(tree.MoveRequestBody)
|
||||||
Body: &grpcService.MoveRequest_Body{
|
body.SetTreeID(prm.TreeID)
|
||||||
ContainerId: prm.CID[:],
|
body.SetBearerToken(prm.BearerToken)
|
||||||
TreeId: prm.TreeID,
|
body.SetContainerID(prm.CID[:])
|
||||||
NodeId: prm.NodeID,
|
body.SetMeta(metaToKV(prm.Meta))
|
||||||
ParentId: prm.ParentID,
|
body.SetNodeID(prm.NodeID)
|
||||||
Meta: metaToKV(prm.Meta),
|
body.SetParentID(prm.ParentID)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
},
|
request := new(tree.MoveRequest)
|
||||||
}
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||||
defer cancel()
|
|
||||||
if _, err := client.Move(reqCtx, request); err != nil {
|
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -592,24 +591,22 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
// * ErrNodeNotFound
|
// * ErrNodeNotFound
|
||||||
// * ErrNodeAccessDenied.
|
// * ErrNodeAccessDenied.
|
||||||
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
request := &grpcService.RemoveRequest{
|
body := new(tree.RemoveRequestBody)
|
||||||
Body: &grpcService.RemoveRequest_Body{
|
body.SetTreeID(prm.TreeID)
|
||||||
ContainerId: prm.CID[:],
|
body.SetBearerToken(prm.BearerToken)
|
||||||
TreeId: prm.TreeID,
|
body.SetContainerID(prm.CID[:])
|
||||||
NodeId: prm.NodeID,
|
body.SetNodeID(prm.NodeID)
|
||||||
BearerToken: prm.BearerToken,
|
|
||||||
},
|
request := new(tree.RemoveRequest)
|
||||||
}
|
request.SetBody(body)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := p.signRequest(request); err != nil {
|
if err := p.signRequest(request); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
|
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||||
defer cancel()
|
|
||||||
if _, err := client.Remove(reqCtx, request); err != nil {
|
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -661,11 +658,14 @@ func handleError(msg string, err error) error {
|
||||||
return fmt.Errorf("%s: %w", msg, err)
|
return fmt.Errorf("%s: %w", msg, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
func metaToKV(meta map[string]string) []*tree.KeyValue {
|
||||||
result := make([]*grpcService.KeyValue, 0, len(meta))
|
result := make([]*tree.KeyValue, 0, len(meta))
|
||||||
|
|
||||||
for key, value := range meta {
|
for key, value := range meta {
|
||||||
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
kv := new(tree.KeyValue)
|
||||||
|
kv.SetKey(key)
|
||||||
|
kv.SetValue([]byte(value))
|
||||||
|
result = append(result, kv)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
@ -814,10 +814,10 @@ func (p *Pool) setStartIndices(i, j int) {
|
||||||
p.startIndicesMtx.Unlock()
|
p.startIndicesMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
|
||||||
var (
|
var (
|
||||||
err, finErr error
|
err, finErr error
|
||||||
cl grpcService.TreeServiceClient
|
cl *rpcclient.Client
|
||||||
)
|
)
|
||||||
|
|
||||||
reqID := GetRequestID(ctx)
|
reqID := GetRequestID(ctx)
|
||||||
|
|
|
@ -9,7 +9,7 @@ type message interface {
|
||||||
SignedDataSize() int
|
SignedDataSize() int
|
||||||
ReadSignedData([]byte) ([]byte, error)
|
ReadSignedData([]byte) ([]byte, error)
|
||||||
GetSignature() *tree.Signature
|
GetSignature() *tree.Signature
|
||||||
SetSignature(*tree.Signature)
|
SetSignature(*tree.Signature) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// signMessage uses the pool key and signs any protobuf
|
// signMessage uses the pool key and signs any protobuf
|
||||||
|
@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error {
|
||||||
|
|
||||||
rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
|
rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
|
||||||
rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
|
rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
|
||||||
m.SetSignature(&tree.Signature{
|
return m.SetSignature(&tree.Signature{
|
||||||
Key: rawPub,
|
Key: rawPub,
|
||||||
Sign: data,
|
Sign: data,
|
||||||
})
|
})
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
@ -17,7 +17,7 @@ type treeClientMock struct {
|
||||||
err bool
|
err bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) {
|
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
|
||||||
if t.err {
|
if t.err {
|
||||||
return nil, errors.New("serviceClient() mock error")
|
return nil, errors.New("serviceClient() mock error")
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ func TestRetry(t *testing.T) {
|
||||||
maxRequestAttempts: lenNodes,
|
maxRequestAttempts: lenNodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
makeFn := func(client *rpcClient.Client) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error empty result", func(t *testing.T) {
|
t.Run("error empty result", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return errNodeEmptyResult
|
return errNodeEmptyResult
|
||||||
|
@ -184,7 +184,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error not found", func(t *testing.T) {
|
t.Run("error not found", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return ErrNodeNotFound
|
return ErrNodeNotFound
|
||||||
|
@ -197,7 +197,7 @@ func TestRetry(t *testing.T) {
|
||||||
|
|
||||||
t.Run("error access denied", func(t *testing.T) {
|
t.Run("error access denied", func(t *testing.T) {
|
||||||
var index int
|
var index int
|
||||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
||||||
index++
|
index++
|
||||||
return ErrNodeAccessDenied
|
return ErrNodeAccessDenied
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue