frostfs-s3-gw/internal/frostfs/services/tree_client_grpc.go
Denis Kirillov 0c1e17dca4 [#74] Add round tree retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00

405 lines
11 KiB
Go

package services
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
}
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
}
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type ServiceClientGRPC struct {
key *keys.PrivateKey
log *zap.Logger
clients []treeClient
startIndex atomic.Int32
}
type treeClient struct {
address string
conn *grpc.ClientConn
service grpcService.TreeServiceClient
}
func (c *ServiceClientGRPC) Endpoints() []string {
res := make([]string, len(c.clients))
for i, client := range c.clients {
res[i] = client.address
}
return res
}
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
res := &ServiceClientGRPC{
key: key,
log: log,
}
for _, addr := range endpoints {
conn, err := grpc.Dial(addr, grpcOpts...)
if err != nil {
log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err))
continue
}
c := grpcService.NewTreeServiceClient(conn)
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err))
continue
}
res.clients = append(res.clients, treeClient{
address: addr,
conn: conn,
service: c,
})
}
if len(res.clients) == 0 {
return nil, errors.New("no healthy tree grpc client")
}
return res, nil
}
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
request := &grpcService.GetNodeByPathRequest{
Body: &grpcService.GetNodeByPathRequest_Body{
ContainerId: p.BktInfo.CID[:],
TreeId: p.TreeID,
Path: p.Path,
Attributes: p.Meta,
PathAttribute: tree.FileNameKey,
LatestOnly: p.LatestOnly,
AllAttributes: p.AllAttrs,
BearerToken: getBearer(ctx, p.BktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return nil, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetNodeByPathRequest, *grpcService.GetNodeByPathResponse] {
return client.service.GetNodeByPath
})
if err != nil {
return nil, handleError("failed to get node by path", err)
}
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
for i, info := range resp.GetBody().GetNodes() {
res[i] = GetNodeByPathResponseInfoWrapper{info}
}
return res, nil
}
func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
request := &grpcService.GetSubTreeRequest{
Body: &grpcService.GetSubTreeRequest_Body{
ContainerId: bktInfo.CID[:],
TreeId: treeID,
RootId: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return nil, err
}
cli, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.GetSubTreeRequest, grpcService.TreeService_GetSubTreeClient] {
return client.service.GetSubTree
})
if err != nil {
return nil, handleError("failed to get sub tree client", err)
}
var subtree []tree.NodeResponse
for {
resp, err := cli.Recv()
if err == io.EOF {
break
} else if err != nil {
return nil, handleError("failed to get sub tree", err)
}
subtree = append(subtree, GetSubTreeResponseBodyWrapper{resp.Body})
}
return subtree, nil
}
func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
request := &grpcService.AddRequest{
Body: &grpcService.AddRequest_Body{
ContainerId: bktInfo.CID[:],
TreeId: treeID,
ParentId: parent,
Meta: metaToKV(meta),
BearerToken: getBearer(ctx, bktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddRequest, *grpcService.AddResponse] {
return client.service.Add
})
if err != nil {
return 0, handleError("failed to add node", err)
}
return resp.GetBody().GetNodeId(), nil
}
func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
request := &grpcService.AddByPathRequest{
Body: &grpcService.AddByPathRequest_Body{
ContainerId: bktInfo.CID[:],
TreeId: treeID,
Path: path,
Meta: metaToKV(meta),
PathAttribute: tree.FileNameKey,
BearerToken: getBearer(ctx, bktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return 0, err
}
resp, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.AddByPathRequest, *grpcService.AddByPathResponse] {
return client.service.AddByPath
})
if err != nil {
return 0, handleError("failed to add node by path", err)
}
body := resp.GetBody()
if body == nil {
return 0, errors.New("nil body in tree service response")
} else if len(body.Nodes) == 0 {
return 0, errors.New("empty list of added nodes in tree service response")
}
// The first node is the leaf that we add, according to tree service docs.
return body.Nodes[0], nil
}
func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
request := &grpcService.MoveRequest{
Body: &grpcService.MoveRequest_Body{
ContainerId: bktInfo.CID[:],
TreeId: treeID,
NodeId: nodeID,
ParentId: parentID,
Meta: metaToKV(meta),
BearerToken: getBearer(ctx, bktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return err
}
_, err := requestWithRetry(ctx, request, c, func(client treeClient) grpcFunc[*grpcService.MoveRequest, *grpcService.MoveResponse] {
return client.service.Move
})
if err != nil {
return handleError("failed to move node", err)
}
return nil
}
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
request := &grpcService.RemoveRequest{
Body: &grpcService.RemoveRequest_Body{
ContainerId: bktInfo.CID[:],
TreeId: treeID,
NodeId: nodeID,
BearerToken: getBearer(ctx, bktInfo),
},
}
if err := c.signRequest(request.Body, func(key, sign []byte) {
request.Signature = &grpcService.Signature{
Key: key,
Sign: sign,
}
}); err != nil {
return err
}
_, err := requestWithRetry(ctx, request, c, func(cl treeClient) grpcFunc[*grpcService.RemoveRequest, *grpcService.RemoveResponse] {
return cl.service.Remove
})
if err != nil {
return handleError("failed to remove node", err)
}
return nil
}
type grpcFunc[Req any, Resp any] func(context.Context, Req, ...grpc.CallOption) (Resp, error)
func requestWithRetry[Req any, Resp any](ctx context.Context, req Req, c *ServiceClientGRPC, fn func(client treeClient) grpcFunc[Req, Resp]) (res Resp, err error) {
start := int(c.startIndex.Load())
for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients)
res, err = fn(c.clients[index])(ctx, req)
if !shouldTryAgain(err) {
c.startIndex.Store(int32(index))
return res, err
}
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
}
return res, err
}
func shouldTryAgain(err error) bool {
if err == nil {
return false
}
code := status.Code(unwrapErr(err))
if code == codes.Unavailable || code == codes.Unimplemented {
return true
}
errText := err.Error()
if strings.Contains(errText, "not found") ||
strings.Contains(errText, "shard is in read-only mode") ||
strings.Contains(errText, "shard is in degraded mode") {
return true
}
return false
}
func unwrapErr(err error) error {
for e := errors.Unwrap(err); e != nil; e = errors.Unwrap(err) {
err = e
}
return err
}
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
result := make([]*grpcService.KeyValue, 0, len(meta))
for key, value := range meta {
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
}
return result
}
func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil {
if bd.Gate.BearerToken != nil {
if bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
return bd.Gate.BearerToken.Marshal()
}
}
}
return nil
}
func handleError(msg string, err error) error {
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") {
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}