411 lines
11 KiB
Go
411 lines
11 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync/atomic"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
|
treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
|
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type GetNodeByPathResponseInfoWrapper struct {
|
|
response *grpcService.GetNodeByPathResponse_Info
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
|
|
return n.response.GetNodeId()
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
|
|
return n.response.GetParentId()
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
|
|
return n.response.GetTimestamp()
|
|
}
|
|
|
|
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
|
res := make([]tree.Meta, len(n.response.Meta))
|
|
for i, value := range n.response.Meta {
|
|
res[i] = value
|
|
}
|
|
return res
|
|
}
|
|
|
|
type GetSubTreeResponseBodyWrapper struct {
|
|
response *grpcService.GetSubTreeResponse_Body
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
|
|
return n.response.GetNodeId()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
|
|
return n.response.GetParentId()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
|
|
return n.response.GetTimestamp()
|
|
}
|
|
|
|
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
|
res := make([]tree.Meta, len(n.response.Meta))
|
|
for i, value := range n.response.Meta {
|
|
res[i] = value
|
|
}
|
|
return res
|
|
}
|
|
|
|
type TreeClient interface {
|
|
TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error)
|
|
Address() string
|
|
}
|
|
|
|
type ServiceClientGRPC struct {
|
|
key *keys.PrivateKey
|
|
log *zap.Logger
|
|
clients []TreeClient
|
|
startIndex int32
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) getStartIndex() int {
|
|
return int(atomic.LoadInt32(&c.startIndex))
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) setStartIndex(index int) {
|
|
atomic.StoreInt32(&c.startIndex, int32(index))
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) Endpoints() []string {
|
|
res := make([]string, len(c.clients))
|
|
for i, client := range c.clients {
|
|
res[i] = client.Address()
|
|
}
|
|
return res
|
|
}
|
|
|
|
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
|
res := &ServiceClientGRPC{
|
|
key: key,
|
|
log: log,
|
|
}
|
|
|
|
firstHealthy := -1
|
|
|
|
res.clients = make([]TreeClient, len(endpoints))
|
|
for i, addr := range endpoints {
|
|
res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...)
|
|
if _, err := res.clients[i].TreeClient(ctx); err != nil {
|
|
log.Warn("dial tree", zap.String("address", addr), zap.Error(err))
|
|
continue
|
|
}
|
|
if firstHealthy == -1 {
|
|
firstHealthy = i
|
|
}
|
|
}
|
|
|
|
if firstHealthy == -1 {
|
|
return nil, errors.New("no healthy tree grpc client")
|
|
}
|
|
|
|
res.setStartIndex(firstHealthy)
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
|
request := &grpcService.GetNodeByPathRequest{
|
|
Body: &grpcService.GetNodeByPathRequest_Body{
|
|
ContainerId: p.BktInfo.CID[:],
|
|
TreeId: p.TreeID,
|
|
Path: p.Path,
|
|
Attributes: p.Meta,
|
|
PathAttribute: tree.FileNameKey,
|
|
LatestOnly: p.LatestOnly,
|
|
AllAttributes: p.AllAttrs,
|
|
BearerToken: getBearer(ctx, p.BktInfo),
|
|
},
|
|
}
|
|
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", p.TreeID),
|
|
zap.String("method", "GetNodeByPath"))
|
|
|
|
var resp *grpcService.GetNodeByPathResponse
|
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
|
return handleError("failed to get node by path", inErr)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
|
for i, info := range resp.GetBody().GetNodes() {
|
|
res[i] = GetNodeByPathResponseInfoWrapper{info}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
|
|
request := &grpcService.GetSubTreeRequest{
|
|
Body: &grpcService.GetSubTreeRequest_Body{
|
|
ContainerId: bktInfo.CID[:],
|
|
TreeId: treeID,
|
|
RootId: rootID,
|
|
Depth: depth,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
},
|
|
}
|
|
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
zap.String("method", "GetSubTree"))
|
|
|
|
var cli grpcService.TreeService_GetSubTreeClient
|
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
cli, inErr = client.GetSubTree(ctx, request)
|
|
return handleError("failed to get sub tree client", inErr)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var subtree []tree.NodeResponse
|
|
for {
|
|
resp, err := cli.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return nil, handleError("failed to get sub tree", err)
|
|
}
|
|
subtree = append(subtree, GetSubTreeResponseBodyWrapper{resp.Body})
|
|
}
|
|
|
|
return subtree, nil
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
|
request := &grpcService.AddRequest{
|
|
Body: &grpcService.AddRequest_Body{
|
|
ContainerId: bktInfo.CID[:],
|
|
TreeId: treeID,
|
|
ParentId: parent,
|
|
Meta: metaToKV(meta),
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
},
|
|
}
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
zap.String("method", "Add"))
|
|
|
|
var resp *grpcService.AddResponse
|
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.Add(ctx, request)
|
|
return handleError("failed to add node", inErr)
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return resp.GetBody().GetNodeId(), nil
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
|
|
request := &grpcService.AddByPathRequest{
|
|
Body: &grpcService.AddByPathRequest_Body{
|
|
ContainerId: bktInfo.CID[:],
|
|
TreeId: treeID,
|
|
Path: path,
|
|
Meta: metaToKV(meta),
|
|
PathAttribute: tree.FileNameKey,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
},
|
|
}
|
|
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
zap.String("method", "AddByPath"))
|
|
|
|
var resp *grpcService.AddByPathResponse
|
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
resp, inErr = client.AddByPath(ctx, request)
|
|
return handleError("failed to add node by path", inErr)
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
body := resp.GetBody()
|
|
if body == nil {
|
|
return 0, errors.New("nil body in tree service response")
|
|
} else if len(body.Nodes) == 0 {
|
|
return 0, errors.New("empty list of added nodes in tree service response")
|
|
}
|
|
|
|
// The first node is the leaf that we add, according to tree service docs.
|
|
return body.Nodes[0], nil
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
|
|
request := &grpcService.MoveRequest{
|
|
Body: &grpcService.MoveRequest_Body{
|
|
ContainerId: bktInfo.CID[:],
|
|
TreeId: treeID,
|
|
NodeId: nodeID,
|
|
ParentId: parentID,
|
|
Meta: metaToKV(meta),
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
},
|
|
}
|
|
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
zap.String("method", "Move"))
|
|
|
|
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
|
if _, err := client.Move(ctx, request); err != nil {
|
|
return handleError("failed to move node", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
|
request := &grpcService.RemoveRequest{
|
|
Body: &grpcService.RemoveRequest_Body{
|
|
ContainerId: bktInfo.CID[:],
|
|
TreeId: treeID,
|
|
NodeId: nodeID,
|
|
BearerToken: getBearer(ctx, bktInfo),
|
|
},
|
|
}
|
|
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
request.Signature = &grpcService.Signature{
|
|
Key: key,
|
|
Sign: sign,
|
|
}
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
zap.String("method", "Remove"))
|
|
|
|
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
|
if _, err := client.Remove(ctx, request); err != nil {
|
|
return handleError("failed to remove node", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error {
|
|
var (
|
|
err error
|
|
cl grpcService.TreeServiceClient
|
|
)
|
|
|
|
start := c.getStartIndex()
|
|
for i := start; i < start+len(c.clients); i++ {
|
|
index := i % len(c.clients)
|
|
if cl, err = c.clients[index].TreeClient(ctx); err == nil {
|
|
err = fn(cl)
|
|
}
|
|
if !shouldTryAgain(err) {
|
|
c.setStartIndex(index)
|
|
return err
|
|
}
|
|
log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func shouldTryAgain(err error) bool {
|
|
return !(err == nil ||
|
|
errors.Is(err, tree.ErrNodeNotFound) ||
|
|
errors.Is(err, tree.ErrNodeAccessDenied))
|
|
}
|
|
|
|
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
|
result := make([]*grpcService.KeyValue, 0, len(meta))
|
|
|
|
for key, value := range meta {
|
|
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
|
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil {
|
|
if bd.Gate.BearerToken != nil {
|
|
if bd.Gate.BearerToken.Impersonate() || bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
|
return bd.Gate.BearerToken.Marshal()
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleError(msg string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
|
} else if strings.Contains(err.Error(), "is denied by") {
|
|
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
|
|
}
|
|
return fmt.Errorf("%s: %w", msg, err)
|
|
}
|