package services import ( "context" "errors" "fmt" "io" "strings" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client" grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" "google.golang.org/grpc" ) type GetNodeByPathResponseInfoWrapper struct { response *grpcService.GetNodeByPathResponse_Info } func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 { return n.response.GetNodeId() } func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 { return n.response.GetParentId() } func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 { return n.response.GetTimestamp() } func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta { res := make([]tree.Meta, len(n.response.Meta)) for i, value := range n.response.Meta { res[i] = value } return res } type GetSubTreeResponseBodyWrapper struct { response *grpcService.GetSubTreeResponse_Body } func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 { return n.response.GetNodeId() } func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 { return n.response.GetParentId() } func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 { return n.response.GetTimestamp() } func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta { res := make([]tree.Meta, len(n.response.Meta)) for i, value := range n.response.Meta { res[i] = value } return res } type TreeClient interface { TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) Address() string } type ServiceClientGRPC struct { key *keys.PrivateKey log *zap.Logger clients []TreeClient startIndex int32 } func (c *ServiceClientGRPC) getStartIndex() int { return int(atomic.LoadInt32(&c.startIndex)) } func (c *ServiceClientGRPC) setStartIndex(index int) { atomic.StoreInt32(&c.startIndex, int32(index)) } func (c *ServiceClientGRPC) Endpoints() []string { res := make([]string, len(c.clients)) for i, client := range c.clients { res[i] = client.Address() } return res } func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) { res := &ServiceClientGRPC{ key: key, log: log, } firstHealthy := -1 res.clients = make([]TreeClient, len(endpoints)) for i, addr := range endpoints { res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...) if _, err := res.clients[i].TreeClient(ctx); err != nil { log.Warn("dial tree", zap.String("address", addr), zap.Error(err)) continue } if firstHealthy == -1 { firstHealthy = i } } if firstHealthy == -1 { return nil, errors.New("no healthy tree grpc client") } res.setStartIndex(firstHealthy) return res, nil } func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) { request := &grpcService.GetNodeByPathRequest{ Body: &grpcService.GetNodeByPathRequest_Body{ ContainerId: p.BktInfo.CID[:], TreeId: p.TreeID, Path: p.Path, Attributes: p.Meta, PathAttribute: tree.FileNameKey, LatestOnly: p.LatestOnly, AllAttributes: p.AllAttrs, BearerToken: getBearer(ctx, p.BktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return nil, err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", p.TreeID), zap.String("method", "GetNodeByPath")) var resp *grpcService.GetNodeByPathResponse if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.GetNodeByPath(ctx, request) return handleError("failed to get node by path", inErr) }); err != nil { return nil, err } res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes())) for i, info := range resp.GetBody().GetNodes() { res[i] = GetNodeByPathResponseInfoWrapper{info} } return res, nil } func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) { request := &grpcService.GetSubTreeRequest{ Body: &grpcService.GetSubTreeRequest_Body{ ContainerId: bktInfo.CID[:], TreeId: treeID, RootId: rootID, Depth: depth, BearerToken: getBearer(ctx, bktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return nil, err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "GetSubTree")) var cli grpcService.TreeService_GetSubTreeClient if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { cli, inErr = client.GetSubTree(ctx, request) return handleError("failed to get sub tree client", inErr) }); err != nil { return nil, err } var subtree []tree.NodeResponse for { resp, err := cli.Recv() if err == io.EOF { break } else if err != nil { return nil, handleError("failed to get sub tree", err) } subtree = append(subtree, GetSubTreeResponseBodyWrapper{resp.Body}) } return subtree, nil } func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { request := &grpcService.AddRequest{ Body: &grpcService.AddRequest_Body{ ContainerId: bktInfo.CID[:], TreeId: treeID, ParentId: parent, Meta: metaToKV(meta), BearerToken: getBearer(ctx, bktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return 0, err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "Add")) var resp *grpcService.AddResponse if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.Add(ctx, request) return handleError("failed to add node", inErr) }); err != nil { return 0, err } return resp.GetBody().GetNodeId(), nil } func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) { request := &grpcService.AddByPathRequest{ Body: &grpcService.AddByPathRequest_Body{ ContainerId: bktInfo.CID[:], TreeId: treeID, Path: path, Meta: metaToKV(meta), PathAttribute: tree.FileNameKey, BearerToken: getBearer(ctx, bktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return 0, err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "AddByPath")) var resp *grpcService.AddByPathResponse if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) { resp, inErr = client.AddByPath(ctx, request) return handleError("failed to add node by path", inErr) }); err != nil { return 0, err } body := resp.GetBody() if body == nil { return 0, errors.New("nil body in tree service response") } else if len(body.Nodes) == 0 { return 0, errors.New("empty list of added nodes in tree service response") } // The first node is the leaf that we add, according to tree service docs. return body.Nodes[0], nil } func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error { request := &grpcService.MoveRequest{ Body: &grpcService.MoveRequest_Body{ ContainerId: bktInfo.CID[:], TreeId: treeID, NodeId: nodeID, ParentId: parentID, Meta: metaToKV(meta), BearerToken: getBearer(ctx, bktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "Move")) return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error { if _, err := client.Move(ctx, request); err != nil { return handleError("failed to move node", err) } return nil }) } func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { request := &grpcService.RemoveRequest{ Body: &grpcService.RemoveRequest_Body{ ContainerId: bktInfo.CID[:], TreeId: treeID, NodeId: nodeID, BearerToken: getBearer(ctx, bktInfo), }, } if err := c.signRequest(request.Body, func(key, sign []byte) { request.Signature = &grpcService.Signature{ Key: key, Sign: sign, } }); err != nil { return err } log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), zap.String("method", "Remove")) return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error { if _, err := client.Remove(ctx, request); err != nil { return handleError("failed to remove node", err) } return nil }) } func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error { var ( err error cl grpcService.TreeServiceClient ) start := c.getStartIndex() for i := start; i < start+len(c.clients); i++ { index := i % len(c.clients) if cl, err = c.clients[index].TreeClient(ctx); err == nil { err = fn(cl) } if !shouldTryAgain(err) { c.setStartIndex(index) return err } log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err)) } return err } func shouldTryAgain(err error) bool { return !(err == nil || errors.Is(err, tree.ErrNodeNotFound) || errors.Is(err, tree.ErrNodeAccessDenied)) } func metaToKV(meta map[string]string) []*grpcService.KeyValue { result := make([]*grpcService.KeyValue, 0, len(meta)) for key, value := range meta { result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)}) } return result } func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte { if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil { if bd.Gate.BearerToken != nil { if bd.Gate.BearerToken.Impersonate() || bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) { return bd.Gate.BearerToken.Marshal() } } } return nil } func handleError(msg string, err error) error { if err == nil { return nil } if strings.Contains(err.Error(), "not found") { return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) } else if strings.Contains(err.Error(), "is denied by") { return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error()) } return fmt.Errorf("%s: %w", msg, err) }