[#185] Implement rpc/client for tree service
All checks were successful
DCO / DCO (pull_request) Successful in 1m4s
Tests and linters / Tests (pull_request) Successful in 1m24s
Tests and linters / Lint (pull_request) Successful in 2m5s

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-11-07 14:29:55 +03:00
parent afdc2d8340
commit 4a9a3572fa
Signed by: nzinkevich
GPG key ID: 748EA1D0B2E6420A
7 changed files with 2288 additions and 120 deletions

173
api/rpc/tree.go Normal file
View file

@ -0,0 +1,173 @@
package rpc
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
)
const serviceTree = "tree.TreeService"
const (
rpcTreeAdd = "Add"
rpcTreeAddByPath = "AddByPath"
rpcTreeRemove = "Remove"
rpcTreeMove = "Move"
rpcTreeGetNodeByPath = "GetNodeByPath"
rpcTreeGetSubTree = "GetSubTree"
rpcTreeList = "TreeList"
rpcTreeApply = "Apply"
rpcTreeGetOpLog = "GetOpLog"
rpcTreeHealthcheck = "Healthcheck"
)
func Add(
cli *client.Client,
req *tree.AddRequest,
opts ...client.CallOption,
) (*tree.AddResponse, error) {
resp := new(tree.AddResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAdd), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func AddByPath(
cli *client.Client,
req *tree.AddByPathRequest,
opts ...client.CallOption,
) (*tree.AddByPathResponse, error) {
resp := new(tree.AddByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeAddByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Remove(cli *client.Client,
req *tree.RemoveRequest,
opts ...client.CallOption,
) (*tree.RemoveResponse, error) {
resp := new(tree.RemoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeRemove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Move(cli *client.Client,
req *tree.MoveRequest,
opts ...client.CallOption,
) (*tree.MoveResponse, error) {
resp := new(tree.MoveResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeMove), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func GetNodeByPath(cli *client.Client,
req *tree.GetNodeByPathRequest,
opts ...client.CallOption,
) (*tree.GetNodeByPathResponse, error) {
resp := new(tree.GetNodeByPathResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeGetNodeByPath), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type GetSubTreeResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *GetSubTreeResponseReader) Read(resp *tree.GetSubTreeResponse) error {
return r.r.ReadMessage(resp)
}
func GetSubTree(cli *client.Client,
req *tree.GetSubTreeRequest,
opts ...client.CallOption) (*GetSubTreeResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetSubTree), req, opts...)
if err != nil {
return nil, err
}
return &GetSubTreeResponseReader{
r: wc,
}, nil
}
func TreeList(cli *client.Client,
req *tree.ListRequest,
opts ...client.CallOption) (*tree.ListResponse, error) {
resp := new(tree.ListResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeList), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
func Apply(cli *client.Client,
req *tree.ApplyRequest,
opts ...client.CallOption) (*tree.ApplyResponse, error) {
resp := new(tree.ApplyResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeApply), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}
type TreeServiceGetOpLogResponseReader struct {
r client.MessageReader
}
// Read reads response from the stream.
//
// Returns io.EOF of streaming is finished.
func (r *TreeServiceGetOpLogResponseReader) Read(resp *tree.GetOpLogResponse) error {
return r.r.ReadMessage(resp)
}
func GetOpLog(cli *client.Client,
req *tree.GetOpLogRequest,
opts ...client.CallOption) (*TreeServiceGetOpLogResponseReader, error) {
wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceTree, rpcTreeGetOpLog), req, opts...)
if err != nil {
return nil, err
}
return &TreeServiceGetOpLogResponseReader{
r: wc,
}, nil
}
func Healthcheck(cli *client.Client,
req *tree.HealthcheckRequest,
opts ...client.CallOption) (*tree.HealthcheckResponse, error) {
resp := new(tree.HealthcheckResponse)
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceTree, rpcTreeHealthcheck), req, resp, opts...)
if err != nil {
return nil, err
}
return resp, nil
}

1590
api/tree/convert.go Normal file

File diff suppressed because it is too large Load diff

393
api/tree/types.go Normal file
View file

@ -0,0 +1,393 @@
package tree
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
)
type AddRequest struct {
Body *AddRequestBody
signature *Signature
}
func (r *AddRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.AddRequest_Body).StableSize()
}
func (r *AddRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.AddRequest_Body).StableMarshal(buf), nil
}
func (r *AddRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type AddRequestBody struct {
ContainerID []byte
BearerToken []byte
TreeID string
ParentID uint64
Meta []*KeyValue
}
type AddResponse struct {
Body *AddResponseBody
signature *Signature
}
type AddResponseBody struct {
NodeID uint64
}
type AddByPathRequest struct {
Body *AddByPathRequestBody
signature *Signature
}
func (r *AddByPathRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableSize()
}
func (r *AddByPathRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.AddByPathRequest_Body).StableMarshal(bytes), nil
}
func (r *AddByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *AddByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type AddByPathRequestBody struct {
ContainerID []byte
TreeID string
PathAttribute string
Path []string
Meta []*KeyValue
BearerToken []byte
}
type AddByPathResponse struct {
Body *AddByPathResponseBody
signature *Signature
}
type AddByPathResponseBody struct {
Nodes []uint64
ParentID uint64
}
type RemoveRequest struct {
Body *RemoveRequestBody
signature *Signature
}
func (r *RemoveRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableSize()
}
func (r *RemoveRequest) ReadSignedData(bytes []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.RemoveRequest_Body).StableMarshal(bytes), nil
}
func (r *RemoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *RemoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type RemoveRequestBody struct {
ContainerID []byte
TreeID string
NodeID uint64
BearerToken []byte
}
type RemoveResponse struct {
Body *RemoveResponseBody
signature *Signature
}
type RemoveResponseBody struct {
}
type MoveRequest struct {
Body *MoveRequestBody
signature *Signature
}
func (r *MoveRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.MoveRequest_Body).StableSize()
}
func (r *MoveRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.MoveRequest_Body).StableMarshal(buf), nil
}
func (r *MoveRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *MoveRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type MoveRequestBody struct {
ContainerID []byte
TreeID string
ParentID uint64
NodeID uint64
Meta []*KeyValue
BearerToken []byte
}
type MoveResponse struct {
Body *MoveResponseBody
signature *Signature
}
type MoveResponseBody struct {
}
type GetNodeByPathRequest struct {
Body *GetNodeByPathRequestBody
signature *Signature
}
func (r *GetNodeByPathRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableSize()
}
func (r *GetNodeByPathRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.GetNodeByPathRequest_Body).StableMarshal(buf), nil
}
func (r *GetNodeByPathRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetNodeByPathRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type GetNodeByPathRequestBody struct {
ContainerID []byte
TreeID string
PathAttribute string
Path []string
Attributes []string
LatestOnly bool
AllAttributes bool
BearerToken []byte
}
type GetNodeByPathResponse struct {
Body *GetNodeByPathResponseBody
signature *Signature
}
type GetNodeByPathResponseBody struct {
Nodes []GetNodeByPathResponseInfo
}
type GetNodeByPathResponseInfo struct {
NodeID uint64
Timestamp uint64
Meta []*KeyValue
ParentID uint64
}
type ListRequest struct {
Body *ListRequestBody
signature *Signature
}
type ListRequestBody struct {
ContainerID []byte
}
type ListResponse struct {
Body *ListResponseBody
signature *Signature
}
type ListResponseBody struct {
IDs []string
}
type GetSubTreeRequest struct {
Body *GetSubTreeRequestBody
signature *Signature
}
func (r *GetSubTreeRequest) SignedDataSize() int {
return r.Body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableSize()
}
func (r *GetSubTreeRequest) ReadSignedData(buf []byte) ([]byte, error) {
return r.Body.ToGRPCMessage().(*tree.GetSubTreeRequest_Body).StableMarshal(buf), nil
}
func (r *GetSubTreeRequest) GetSignature() *tree.Signature {
return r.signature.ToGRPCMessage().(*tree.Signature)
}
func (r *GetSubTreeRequest) SetSignature(signature *tree.Signature) error {
if r.signature == nil {
r.signature = new(Signature)
}
return r.signature.FromGRPCMessage(signature)
}
type GetSubTreeRequestBody struct {
ContainerID []byte
TreeID string
RootID []uint64
Depth uint32
BearerToken []byte
OrderBy *GetSubTreeRequestBodyOrder
}
type GetSubTreeRequestBodyOrder struct {
Direction GetSubTreeRequestBodyOrderDirection
}
const (
GetSubTreeRequestBodyOrderNone = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_None)
GetSubTreeRequestBodyOrderAsc = GetSubTreeRequestBodyOrderDirection(tree.GetSubTreeRequest_Body_Order_Asc)
)
type GetSubTreeRequestBodyOrderDirection int32
type GetSubTreeResponse struct {
Body *GetSubTreeResponseBody
signature *Signature
}
type GetSubTreeResponseBody struct {
NodeID []uint64
ParentID []uint64
Timestamp []uint64
Meta []*KeyValue
}
type ApplyRequest struct {
Body *ApplyRequestBody
signature *Signature
}
type ApplyRequestBody struct {
ContainerID []byte
TreeID string
Operation *LogMove
}
type ApplyResponse struct {
Body *ApplyResponseBody
signature *Signature
}
type ApplyResponseBody struct {
}
type GetOpLogRequest struct {
Body *GetOpLogRequestBody
signature *Signature
}
type GetOpLogRequestBody struct {
ContainerID []byte
TreeID string
Height uint64
Count uint64
}
type GetOpLogResponse struct {
Body *GetOpLogResponseBody
signature *Signature
}
type GetOpLogResponseBody struct {
Operation *LogMove
}
type HealthcheckRequest struct {
body *HealthcheckRequestBody
signature *Signature
session.RequestHeaders
}
func (r *HealthcheckRequest) GetBody() *HealthcheckRequestBody {
if r != nil {
return r.body
}
return new(HealthcheckRequestBody)
}
func (r *HealthcheckRequest) SetBody(v *HealthcheckRequestBody) {
r.body = v
}
type HealthcheckRequestBody struct{}
type HealthcheckResponse struct {
Body *HealthcheckResponseBody
signature *Signature
}
type HealthcheckResponseBody struct{}
type LogMove struct {
ParentID uint64
Meta []byte
ChildID uint64
}
type Signature struct {
Key []byte
Sign []byte
}
type KeyValue struct {
Key string
Value []byte
}
func (k *KeyValue) GetKey() string {
return k.Key
}
func (k *KeyValue) GetValue() []byte {
return k.Value
}

View file

@ -5,10 +5,13 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"sync"
"time"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@ -18,8 +21,11 @@ type treeClient struct {
mu sync.RWMutex
address string
opts []grpc.DialOption
conn *grpc.ClientConn
service grpcService.TreeServiceClient
conn io.Closer
client *rpcclient.Client
nodeDialTimeout time.Duration
streamTimeout time.Duration
healthy bool
}
@ -27,10 +33,12 @@ type treeClient struct {
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
// newTreeClient creates new tree client with auto dial.
func newTreeClient(addr string, opts ...grpc.DialOption) *treeClient {
func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
return &treeClient{
address: addr,
opts: opts,
nodeDialTimeout: dialTimeout,
streamTimeout: streamTimeout,
}
}
@ -43,14 +51,16 @@ func (c *treeClient) dial(ctx context.Context) error {
}
var err error
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
c.client, err = c.createClient()
if err != nil {
return err
}
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
return fmt.Errorf("healthcheck tree service: %w", err)
}
c.conn = c.client.Conn()
c.healthy = true
return nil
@ -61,13 +71,13 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
defer c.mu.Unlock()
if c.conn == nil {
if c.conn, c.service, err = createClient(c.address, c.opts...); err != nil {
if c.client, err = c.createClient(); err != nil {
return false, err
}
}
wasHealthy := c.healthy
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
c.healthy = false
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
}
@ -77,10 +87,10 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
return !wasHealthy, nil
}
func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientConn, grpcService.TreeServiceClient, error) {
host, tlsEnable, err := apiClient.ParseURI(addr)
func (c *treeClient) createClient() (*rpcclient.Client, error) {
_, tlsEnable, err := rpcclient.ParseURI(c.address)
if err != nil {
return nil, nil, fmt.Errorf("parse address: %w", err)
return nil, fmt.Errorf("parse address: %w", err)
}
creds := insecure.NewCredentials()
@ -91,17 +101,19 @@ func createClient(addr string, clientOptions ...grpc.DialOption) (*grpc.ClientCo
options := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
// the order is matter, we want client to be able to overwrite options.
opts := append(options, clientOptions...)
opts := append(options, c.opts...)
conn, err := grpc.NewClient(host, opts...)
if err != nil {
return nil, nil, fmt.Errorf("grpc create node tree service: %w", err)
}
cli := rpcclient.New(append(
rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
rpcclient.WithDialTimeout(c.nodeDialTimeout),
rpcclient.WithRWTimeout(c.streamTimeout),
rpcclient.WithGRPCDialOptions(opts),
)...)
return conn, grpcService.NewTreeServiceClient(conn), nil
return cli, nil
}
func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
c.mu.RLock()
defer c.mu.RUnlock()
@ -109,7 +121,7 @@ func (c *treeClient) serviceClient() (grpcService.TreeServiceClient, error) {
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
}
return c.service, nil
return c.client, nil
}
func (c *treeClient) endpoint() string {
@ -136,5 +148,11 @@ func (c *treeClient) close() error {
return nil
}
return c.conn.Close()
err := c.conn.Close()
if err != nil {
return err
}
c.conn = nil
return nil
}

View file

@ -10,9 +10,11 @@ import (
"sync"
"time"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -51,7 +53,7 @@ var (
// This interface is expected to have exactly one production implementation - treeClient.
// Others are expected to be for test purposes only.
type client interface {
serviceClient() (grpcService.TreeServiceClient, error)
serviceClient() (*rpcclient.Client, error)
endpoint() string
isHealthy() bool
setHealthy(bool)
@ -92,6 +94,7 @@ type Pool struct {
maxRequestAttempts int
streamTimeout time.Duration
nodeDialTimeout time.Duration
startIndicesMtx sync.RWMutex
// startIndices points to the client from which the next request will be executed.
@ -254,7 +257,7 @@ func (p *Pool) Dial(ctx context.Context) error {
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
@ -336,11 +339,11 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
// Can return predefined errors:
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
request := &grpcService.GetNodeByPathRequest{
Body: &grpcService.GetNodeByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]tree.GetNodeByPathResponseInfo, error) {
request := &tree.GetNodeByPathRequest{
Body: &tree.GetNodeByPathRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
Path: prm.Path,
Attributes: prm.Meta,
PathAttribute: prm.PathAttribute,
@ -355,16 +358,14 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
return nil, err
}
var resp *grpcService.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.GetNodeByPath(reqCtx, request)
var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
// Return an error there to trigger retry and ignore it after,
// to keep compatibility with 'GetNodeByPath' implementation.
if inErr == nil && len(resp.GetBody().GetNodes()) == 0 {
if inErr == nil && len(resp.Body.Nodes) == 0 {
return errNodeEmptyResult
}
return handleError("failed to get node by path", inErr)
@ -374,50 +375,53 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
return nil, err
}
return resp.GetBody().GetNodes(), nil
return resp.Body.Nodes, nil
}
// SubTreeReader is designed to read list of subtree nodes FrostFS tree service.
//
// Must be initialized using Pool.GetSubTree, any other usage is unsafe.
type SubTreeReader struct {
cli grpcService.TreeService_GetSubTreeClient
cli *rpcapi.GetSubTreeResponseReader
}
// Read reads another list of the subtree nodes.
func (x *SubTreeReader) Read(buf []*grpcService.GetSubTreeResponse_Body) (int, error) {
func (x *SubTreeReader) Read(buf []*tree.GetSubTreeResponseBody) (int, error) {
for i := range buf {
resp, err := x.cli.Recv()
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return i, io.EOF
} else if err != nil {
return i, handleError("failed to get sub tree", err)
}
buf[i] = resp.GetBody()
buf[i] = resp.Body
}
return len(buf), nil
}
// ReadAll reads all nodes subtree nodes.
func (x *SubTreeReader) ReadAll() ([]*grpcService.GetSubTreeResponse_Body, error) {
var res []*grpcService.GetSubTreeResponse_Body
func (x *SubTreeReader) ReadAll() ([]*tree.GetSubTreeResponseBody, error) {
var res []*tree.GetSubTreeResponseBody
for {
resp, err := x.cli.Recv()
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
break
} else if err != nil {
return nil, handleError("failed to get sub tree", err)
}
res = append(res, resp.GetBody())
res = append(res, resp.Body)
}
return res, nil
}
// Next gets the next node from subtree.
func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
resp, err := x.cli.Recv()
func (x *SubTreeReader) Next() (*tree.GetSubTreeResponseBody, error) {
var resp tree.GetSubTreeResponse
err := x.cli.Read(&resp)
if err == io.EOF {
return nil, io.EOF
}
@ -425,7 +429,7 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
return nil, handleError("failed to get sub tree", err)
}
return resp.GetBody(), nil
return resp.Body, nil
}
// GetSubTree invokes eponymous method from TreeServiceClient.
@ -434,22 +438,22 @@ func (x *SubTreeReader) Next() (*grpcService.GetSubTreeResponse_Body, error) {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeReader, error) {
request := &grpcService.GetSubTreeRequest{
Body: &grpcService.GetSubTreeRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
RootId: prm.RootID,
request := &tree.GetSubTreeRequest{
Body: &tree.GetSubTreeRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
RootID: prm.RootID,
Depth: prm.Depth,
BearerToken: prm.BearerToken,
OrderBy: new(grpcService.GetSubTreeRequest_Body_Order),
OrderBy: new(tree.GetSubTreeRequestBodyOrder),
},
}
switch prm.Order {
case AscendingOrder:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_Asc
request.Body.OrderBy.Direction = tree.GetSubTreeRequestBodyOrderAsc
default:
request.Body.OrderBy.Direction = grpcService.GetSubTreeRequest_Body_Order_None
request.Body.OrderBy.Direction = tree.GetSubTreeRequestBodyOrderNone
}
start := time.Now()
@ -457,9 +461,9 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
return nil, err
}
var cli grpcService.TreeService_GetSubTreeClient
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
})
p.methods[methodGetSubTree].IncRequests(time.Since(start))
@ -476,11 +480,11 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
request := &grpcService.AddRequest{
Body: &grpcService.AddRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
ParentId: prm.Parent,
request := &tree.AddRequest{
Body: &tree.AddRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
ParentID: prm.Parent,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
@ -491,11 +495,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
return 0, err
}
var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.Add(reqCtx, request)
var resp *tree.AddResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node", inErr)
})
p.methods[methodAddNode].IncRequests(time.Since(start))
@ -503,7 +505,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
return resp.Body.NodeID, nil
}
// AddNodeByPath invokes eponymous method from TreeServiceClient.
@ -512,10 +514,10 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
request := &grpcService.AddByPathRequest{
Body: &grpcService.AddByPathRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
request := &tree.AddByPathRequest{
Body: &tree.AddByPathRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
Path: prm.Path,
Meta: metaToKV(prm.Meta),
PathAttribute: prm.PathAttribute,
@ -528,11 +530,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
return 0, err
}
var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
resp, inErr = client.AddByPath(reqCtx, request)
var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node by path", inErr)
})
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
@ -540,15 +540,15 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
return 0, err
}
body := resp.GetBody()
body := resp.Body
if body == nil {
return 0, errors.New("nil body in tree service response")
} else if len(body.GetNodes()) == 0 {
} else if len(body.Nodes) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return body.GetNodes()[0], nil
return body.Nodes[0], nil
}
// MoveNode invokes eponymous method from TreeServiceClient.
@ -557,12 +557,12 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
request := &grpcService.MoveRequest{
Body: &grpcService.MoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
ParentId: prm.ParentID,
request := &tree.MoveRequest{
Body: &tree.MoveRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
NodeID: prm.NodeID,
ParentID: prm.ParentID,
Meta: metaToKV(prm.Meta),
BearerToken: prm.BearerToken,
},
@ -573,10 +573,8 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Move(reqCtx, request); err != nil {
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to move node", err)
}
return nil
@ -592,11 +590,11 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
// * ErrNodeNotFound
// * ErrNodeAccessDenied.
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
request := &grpcService.RemoveRequest{
Body: &grpcService.RemoveRequest_Body{
ContainerId: prm.CID[:],
TreeId: prm.TreeID,
NodeId: prm.NodeID,
request := &tree.RemoveRequest{
Body: &tree.RemoveRequestBody{
ContainerID: prm.CID[:],
TreeID: prm.TreeID,
NodeID: prm.NodeID,
BearerToken: prm.BearerToken,
},
}
@ -606,10 +604,8 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
reqCtx, cancel := context.WithTimeout(ctx, p.streamTimeout)
defer cancel()
if _, err := client.Remove(reqCtx, request); err != nil {
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to remove node", err)
}
return nil
@ -661,11 +657,11 @@ func handleError(msg string, err error) error {
return fmt.Errorf("%s: %w", msg, err)
}
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
result := make([]*grpcService.KeyValue, 0, len(meta))
func metaToKV(meta map[string]string) []*tree.KeyValue {
result := make([]*tree.KeyValue, 0, len(meta))
for key, value := range meta {
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
result = append(result, &tree.KeyValue{Key: key, Value: []byte(value)})
}
return result
@ -814,10 +810,10 @@ func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Unlock()
}
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)

View file

@ -9,7 +9,7 @@ type message interface {
SignedDataSize() int
ReadSignedData([]byte) ([]byte, error)
GetSignature() *tree.Signature
SetSignature(*tree.Signature)
SetSignature(*tree.Signature) error
}
// signMessage uses the pool key and signs any protobuf
@ -29,10 +29,8 @@ func (p *Pool) signRequest(m message) error {
rawPub := make([]byte, keySDK.Public().MaxEncodedSize())
rawPub = rawPub[:keySDK.Public().Encode(rawPub)]
m.SetSignature(&tree.Signature{
return m.SetSignature(&tree.Signature{
Key: rawPub,
Sign: data,
})
return nil
}

View file

@ -5,9 +5,9 @@ import (
"errors"
"testing"
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -17,7 +17,7 @@ type treeClientMock struct {
err bool
}
func (t *treeClientMock) serviceClient() (grpcService.TreeServiceClient, error) {
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
if t.err {
return nil, errors.New("serviceClient() mock error")
}
@ -99,7 +99,7 @@ func TestRetry(t *testing.T) {
maxRequestAttempts: lenNodes,
}
makeFn := func(client grpcService.TreeServiceClient) error {
makeFn := func(client *rpcClient.Client) error {
return nil
}
@ -171,7 +171,7 @@ func TestRetry(t *testing.T) {
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return errNodeEmptyResult
@ -184,7 +184,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return ErrNodeNotFound
@ -197,7 +197,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
index++
return ErrNodeAccessDenied
})