forked from TrueCloudLab/frostfs-s3-gw
[#133] Drop sync-tree
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
9dabaf6ecd
commit
0bcda6ea37
9 changed files with 1 additions and 677 deletions
|
@ -7,9 +7,6 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
- name: Sync tree service
|
|
||||||
run: make sync-tree
|
|
||||||
|
|
||||||
- name: golangci-lint
|
- name: golangci-lint
|
||||||
uses: https://github.com/golangci/golangci-lint-action@v2
|
uses: https://github.com/golangci/golangci-lint-action@v2
|
||||||
with:
|
with:
|
||||||
|
@ -30,9 +27,6 @@ jobs:
|
||||||
with:
|
with:
|
||||||
go-version: '${{ matrix.go_versions }}'
|
go-version: '${{ matrix.go_versions }}'
|
||||||
|
|
||||||
- name: Sync tree service
|
|
||||||
run: make sync-tree
|
|
||||||
|
|
||||||
- name: Update Go modules
|
- name: Update Go modules
|
||||||
run: make dep
|
run: make dep
|
||||||
|
|
||||||
|
|
|
@ -9,9 +9,6 @@ jobs:
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
- name: Sync tree service
|
|
||||||
run: make sync-tree
|
|
||||||
|
|
||||||
- name: Setup Go
|
- name: Setup Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
|
|
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -2,9 +2,6 @@
|
||||||
.idea
|
.idea
|
||||||
.vscode
|
.vscode
|
||||||
|
|
||||||
# Tree service
|
|
||||||
internal/frostfs/services/tree/
|
|
||||||
|
|
||||||
# Vendoring
|
# Vendoring
|
||||||
vendor
|
vendor
|
||||||
|
|
||||||
|
|
8
Makefile
8
Makefile
|
@ -6,7 +6,6 @@ VERSION ?= $(shell git describe --tags --dirty --match "v*" --always --abbrev=8
|
||||||
GO_VERSION ?= 1.19
|
GO_VERSION ?= 1.19
|
||||||
LINT_VERSION ?= 1.49.0
|
LINT_VERSION ?= 1.49.0
|
||||||
BINDIR = bin
|
BINDIR = bin
|
||||||
SYNCDIR = internal/frostfs/services/tree
|
|
||||||
|
|
||||||
METRICS_DUMP_OUT ?= ./metrics-dump.json
|
METRICS_DUMP_OUT ?= ./metrics-dump.json
|
||||||
|
|
||||||
|
@ -31,7 +30,7 @@ PKG_VERSION ?= $(shell echo $(VERSION) | sed "s/^v//" | \
|
||||||
# Make all binaries
|
# Make all binaries
|
||||||
all: $(BINS)
|
all: $(BINS)
|
||||||
|
|
||||||
$(BINS): sync-tree $(BINDIR) dep
|
$(BINS): $(BINDIR) dep
|
||||||
@echo "⇒ Build $@"
|
@echo "⇒ Build $@"
|
||||||
CGO_ENABLED=0 \
|
CGO_ENABLED=0 \
|
||||||
go build -v -trimpath \
|
go build -v -trimpath \
|
||||||
|
@ -42,10 +41,6 @@ $(BINDIR):
|
||||||
@echo "⇒ Ensure dir: $@"
|
@echo "⇒ Ensure dir: $@"
|
||||||
@mkdir -p $@
|
@mkdir -p $@
|
||||||
|
|
||||||
# Synchronize tree service
|
|
||||||
sync-tree:
|
|
||||||
@./syncTree.sh
|
|
||||||
|
|
||||||
# Pull go dependencies
|
# Pull go dependencies
|
||||||
dep:
|
dep:
|
||||||
@printf "⇒ Download requirements: "
|
@printf "⇒ Download requirements: "
|
||||||
|
@ -134,7 +129,6 @@ version:
|
||||||
clean:
|
clean:
|
||||||
rm -rf .cache
|
rm -rf .cache
|
||||||
rm -rf $(BINDIR)
|
rm -rf $(BINDIR)
|
||||||
rm -rf $(SYNCDIR)
|
|
||||||
|
|
||||||
# Generate code from .proto files
|
# Generate code from .proto files
|
||||||
protoc:
|
protoc:
|
||||||
|
|
|
@ -1,70 +0,0 @@
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type TreeClient struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
address string
|
|
||||||
opts []grpc.DialOption
|
|
||||||
conn *grpc.ClientConn
|
|
||||||
service grpcService.TreeServiceClient
|
|
||||||
dialed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTreeClient creates new tree client with auto dial.
|
|
||||||
func NewTreeClient(addr string, opts ...grpc.DialOption) *TreeClient {
|
|
||||||
return &TreeClient{
|
|
||||||
address: addr,
|
|
||||||
opts: opts,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TreeClient) dial(ctx context.Context) error {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
if c.dialed {
|
|
||||||
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := grpc.Dial(c.address, c.opts...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("grpc dial node tree service: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceClient := grpcService.NewTreeServiceClient(conn)
|
|
||||||
if _, err = serviceClient.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
|
||||||
return fmt.Errorf("healthcheck tree service: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.conn = conn
|
|
||||||
c.service = serviceClient
|
|
||||||
c.dialed = true
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TreeClient) TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) {
|
|
||||||
c.mu.RLock()
|
|
||||||
dialed := c.dialed
|
|
||||||
c.mu.RUnlock()
|
|
||||||
|
|
||||||
if !dialed {
|
|
||||||
if err := c.dial(ctx); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.service, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TreeClient) Address() string {
|
|
||||||
return c.address
|
|
||||||
}
|
|
|
@ -1,411 +0,0 @@
|
||||||
package services
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
|
||||||
treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
|
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type GetNodeByPathResponseInfoWrapper struct {
|
|
||||||
response *grpcService.GetNodeByPathResponse_Info
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
|
|
||||||
return n.response.GetNodeId()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
|
|
||||||
return n.response.GetParentId()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
|
|
||||||
return n.response.GetTimestamp()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
|
||||||
res := make([]tree.Meta, len(n.response.Meta))
|
|
||||||
for i, value := range n.response.Meta {
|
|
||||||
res[i] = value
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
type GetSubTreeResponseBodyWrapper struct {
|
|
||||||
response *grpcService.GetSubTreeResponse_Body
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
|
|
||||||
return n.response.GetNodeId()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
|
|
||||||
return n.response.GetParentId()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
|
|
||||||
return n.response.GetTimestamp()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
|
||||||
res := make([]tree.Meta, len(n.response.Meta))
|
|
||||||
for i, value := range n.response.Meta {
|
|
||||||
res[i] = value
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
type TreeClient interface {
|
|
||||||
TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error)
|
|
||||||
Address() string
|
|
||||||
}
|
|
||||||
|
|
||||||
type ServiceClientGRPC struct {
|
|
||||||
key *keys.PrivateKey
|
|
||||||
log *zap.Logger
|
|
||||||
clients []TreeClient
|
|
||||||
startIndex int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) getStartIndex() int {
|
|
||||||
return int(atomic.LoadInt32(&c.startIndex))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) setStartIndex(index int) {
|
|
||||||
atomic.StoreInt32(&c.startIndex, int32(index))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) Endpoints() []string {
|
|
||||||
res := make([]string, len(c.clients))
|
|
||||||
for i, client := range c.clients {
|
|
||||||
res[i] = client.Address()
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
|
||||||
res := &ServiceClientGRPC{
|
|
||||||
key: key,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
|
|
||||||
firstHealthy := -1
|
|
||||||
|
|
||||||
res.clients = make([]TreeClient, len(endpoints))
|
|
||||||
for i, addr := range endpoints {
|
|
||||||
res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...)
|
|
||||||
if _, err := res.clients[i].TreeClient(ctx); err != nil {
|
|
||||||
log.Warn("dial tree", zap.String("address", addr), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if firstHealthy == -1 {
|
|
||||||
firstHealthy = i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if firstHealthy == -1 {
|
|
||||||
return nil, errors.New("no healthy tree grpc client")
|
|
||||||
}
|
|
||||||
|
|
||||||
res.setStartIndex(firstHealthy)
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
|
||||||
request := &grpcService.GetNodeByPathRequest{
|
|
||||||
Body: &grpcService.GetNodeByPathRequest_Body{
|
|
||||||
ContainerId: p.BktInfo.CID[:],
|
|
||||||
TreeId: p.TreeID,
|
|
||||||
Path: p.Path,
|
|
||||||
Attributes: p.Meta,
|
|
||||||
PathAttribute: tree.FileNameKey,
|
|
||||||
LatestOnly: p.LatestOnly,
|
|
||||||
AllAttributes: p.AllAttrs,
|
|
||||||
BearerToken: getBearer(ctx, p.BktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", p.TreeID),
|
|
||||||
zap.String("method", "GetNodeByPath"))
|
|
||||||
|
|
||||||
var resp *grpcService.GetNodeByPathResponse
|
|
||||||
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
||||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
|
||||||
return handleError("failed to get node by path", inErr)
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
|
||||||
for i, info := range resp.GetBody().GetNodes() {
|
|
||||||
res[i] = GetNodeByPathResponseInfoWrapper{info}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
|
|
||||||
request := &grpcService.GetSubTreeRequest{
|
|
||||||
Body: &grpcService.GetSubTreeRequest_Body{
|
|
||||||
ContainerId: bktInfo.CID[:],
|
|
||||||
TreeId: treeID,
|
|
||||||
RootId: rootID,
|
|
||||||
Depth: depth,
|
|
||||||
BearerToken: getBearer(ctx, bktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
||||||
zap.String("method", "GetSubTree"))
|
|
||||||
|
|
||||||
var cli grpcService.TreeService_GetSubTreeClient
|
|
||||||
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
||||||
cli, inErr = client.GetSubTree(ctx, request)
|
|
||||||
return handleError("failed to get sub tree client", inErr)
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var subtree []tree.NodeResponse
|
|
||||||
for {
|
|
||||||
resp, err := cli.Recv()
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, handleError("failed to get sub tree", err)
|
|
||||||
}
|
|
||||||
subtree = append(subtree, GetSubTreeResponseBodyWrapper{resp.Body})
|
|
||||||
}
|
|
||||||
|
|
||||||
return subtree, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
|
||||||
request := &grpcService.AddRequest{
|
|
||||||
Body: &grpcService.AddRequest_Body{
|
|
||||||
ContainerId: bktInfo.CID[:],
|
|
||||||
TreeId: treeID,
|
|
||||||
ParentId: parent,
|
|
||||||
Meta: metaToKV(meta),
|
|
||||||
BearerToken: getBearer(ctx, bktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
||||||
zap.String("method", "Add"))
|
|
||||||
|
|
||||||
var resp *grpcService.AddResponse
|
|
||||||
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
||||||
resp, inErr = client.Add(ctx, request)
|
|
||||||
return handleError("failed to add node", inErr)
|
|
||||||
}); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp.GetBody().GetNodeId(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
|
|
||||||
request := &grpcService.AddByPathRequest{
|
|
||||||
Body: &grpcService.AddByPathRequest_Body{
|
|
||||||
ContainerId: bktInfo.CID[:],
|
|
||||||
TreeId: treeID,
|
|
||||||
Path: path,
|
|
||||||
Meta: metaToKV(meta),
|
|
||||||
PathAttribute: tree.FileNameKey,
|
|
||||||
BearerToken: getBearer(ctx, bktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
||||||
zap.String("method", "AddByPath"))
|
|
||||||
|
|
||||||
var resp *grpcService.AddByPathResponse
|
|
||||||
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
|
||||||
resp, inErr = client.AddByPath(ctx, request)
|
|
||||||
return handleError("failed to add node by path", inErr)
|
|
||||||
}); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
body := resp.GetBody()
|
|
||||||
if body == nil {
|
|
||||||
return 0, errors.New("nil body in tree service response")
|
|
||||||
} else if len(body.Nodes) == 0 {
|
|
||||||
return 0, errors.New("empty list of added nodes in tree service response")
|
|
||||||
}
|
|
||||||
|
|
||||||
// The first node is the leaf that we add, according to tree service docs.
|
|
||||||
return body.Nodes[0], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
|
|
||||||
request := &grpcService.MoveRequest{
|
|
||||||
Body: &grpcService.MoveRequest_Body{
|
|
||||||
ContainerId: bktInfo.CID[:],
|
|
||||||
TreeId: treeID,
|
|
||||||
NodeId: nodeID,
|
|
||||||
ParentId: parentID,
|
|
||||||
Meta: metaToKV(meta),
|
|
||||||
BearerToken: getBearer(ctx, bktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
||||||
zap.String("method", "Move"))
|
|
||||||
|
|
||||||
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
|
||||||
if _, err := client.Move(ctx, request); err != nil {
|
|
||||||
return handleError("failed to move node", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
|
||||||
request := &grpcService.RemoveRequest{
|
|
||||||
Body: &grpcService.RemoveRequest_Body{
|
|
||||||
ContainerId: bktInfo.CID[:],
|
|
||||||
TreeId: treeID,
|
|
||||||
NodeId: nodeID,
|
|
||||||
BearerToken: getBearer(ctx, bktInfo),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := c.signRequest(request.Body, func(key, sign []byte) {
|
|
||||||
request.Signature = &grpcService.Signature{
|
|
||||||
Key: key,
|
|
||||||
Sign: sign,
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
|
||||||
zap.String("method", "Remove"))
|
|
||||||
|
|
||||||
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
|
||||||
if _, err := client.Remove(ctx, request); err != nil {
|
|
||||||
return handleError("failed to remove node", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
cl grpcService.TreeServiceClient
|
|
||||||
)
|
|
||||||
|
|
||||||
start := c.getStartIndex()
|
|
||||||
for i := start; i < start+len(c.clients); i++ {
|
|
||||||
index := i % len(c.clients)
|
|
||||||
if cl, err = c.clients[index].TreeClient(ctx); err == nil {
|
|
||||||
err = fn(cl)
|
|
||||||
}
|
|
||||||
if !shouldTryAgain(err) {
|
|
||||||
c.setStartIndex(index)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
|
||||||
return !(err == nil ||
|
|
||||||
errors.Is(err, tree.ErrNodeNotFound) ||
|
|
||||||
errors.Is(err, tree.ErrNodeAccessDenied))
|
|
||||||
}
|
|
||||||
|
|
||||||
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
|
||||||
result := make([]*grpcService.KeyValue, 0, len(meta))
|
|
||||||
|
|
||||||
for key, value := range meta {
|
|
||||||
result = append(result, &grpcService.KeyValue{Key: key, Value: []byte(value)})
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
|
||||||
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil {
|
|
||||||
if bd.Gate.BearerToken != nil {
|
|
||||||
if bd.Gate.BearerToken.Impersonate() || bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
|
||||||
return bd.Gate.BearerToken.Marshal()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleError(msg string, err error) error {
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if strings.Contains(err.Error(), "not found") {
|
|
||||||
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
|
||||||
} else if strings.Contains(err.Error(), "is denied by") {
|
|
||||||
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
|
|
||||||
}
|
|
||||||
return fmt.Errorf("%s: %w", msg, err)
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
/*REMOVE THIS AFTER SIGNATURE WILL BE AVAILABLE IN TREE CLIENT FROM FROSTFS NODE*/
|
|
||||||
package services
|
|
||||||
|
|
||||||
import (
|
|
||||||
crypto "git.frostfs.info/TrueCloudLab/frostfs-crypto"
|
|
||||||
"google.golang.org/protobuf/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error {
|
|
||||||
// When SDK library adopts Tree service client, this should be dropped.
|
|
||||||
sign, err := crypto.Sign(&c.key.PrivateKey, buf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f(c.key.PublicKey().Bytes(), sign)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) signRequest(requestBody proto.Message, f func(key, sign []byte)) error {
|
|
||||||
buf, err := proto.Marshal(requestBody)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.signData(buf, f)
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
package services
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"go.uber.org/zap/zaptest"
|
|
||||||
)
|
|
||||||
|
|
||||||
type treeClientMock struct {
|
|
||||||
address string
|
|
||||||
err bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *treeClientMock) TreeClient(context.Context) (grpcService.TreeServiceClient, error) {
|
|
||||||
if t.err {
|
|
||||||
return nil, errors.New("error")
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *treeClientMock) Address() string {
|
|
||||||
return t.address
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
|
||||||
defaultError := errors.New("default error")
|
|
||||||
for _, tc := range []struct {
|
|
||||||
err error
|
|
||||||
expectedError error
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
err: defaultError,
|
|
||||||
expectedError: defaultError,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
err: errors.New("something not found"),
|
|
||||||
expectedError: layer.ErrNodeNotFound,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
err: errors.New("something is denied by some acl rule"),
|
|
||||||
expectedError: layer.ErrNodeAccessDenied,
|
|
||||||
},
|
|
||||||
} {
|
|
||||||
t.Run("", func(t *testing.T) {
|
|
||||||
err := handleError("err message", tc.err)
|
|
||||||
require.True(t, errors.Is(err, tc.expectedError))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
log := zaptest.NewLogger(t)
|
|
||||||
|
|
||||||
cl := &ServiceClientGRPC{
|
|
||||||
log: zaptest.NewLogger(t),
|
|
||||||
clients: []TreeClient{
|
|
||||||
&treeClientMock{address: "node0"},
|
|
||||||
&treeClientMock{address: "node1"},
|
|
||||||
&treeClientMock{address: "node2"},
|
|
||||||
&treeClientMock{address: "node3"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
|
||||||
err := cl.requestWithRetry(ctx, log, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 0, cl.getStartIndex())
|
|
||||||
resetClients(cl)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
|
||||||
setErrors(cl.clients[:1])
|
|
||||||
err := cl.requestWithRetry(ctx, log, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, cl.getStartIndex())
|
|
||||||
resetClients(cl)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("all failed", func(t *testing.T) {
|
|
||||||
setErrors(cl.clients)
|
|
||||||
err := cl.requestWithRetry(ctx, log, makeFn)
|
|
||||||
require.Error(t, err)
|
|
||||||
require.Equal(t, 0, cl.getStartIndex())
|
|
||||||
resetClients(cl)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("round", func(t *testing.T) {
|
|
||||||
setErrors(cl.clients[:2])
|
|
||||||
err := cl.requestWithRetry(ctx, log, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 2, cl.getStartIndex())
|
|
||||||
resetClientsErrors(cl)
|
|
||||||
|
|
||||||
setErrors(cl.clients[2:])
|
|
||||||
err = cl.requestWithRetry(ctx, log, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 0, cl.getStartIndex())
|
|
||||||
resetClients(cl)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func resetClients(cl *ServiceClientGRPC) {
|
|
||||||
resetClientsErrors(cl)
|
|
||||||
cl.setStartIndex(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func resetClientsErrors(cl *ServiceClientGRPC) {
|
|
||||||
for _, client := range cl.clients {
|
|
||||||
node := client.(*treeClientMock)
|
|
||||||
node.err = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func setErrors(clients []TreeClient) {
|
|
||||||
for _, client := range clients {
|
|
||||||
node := client.(*treeClientMock)
|
|
||||||
node.err = true
|
|
||||||
}
|
|
||||||
}
|
|
21
syncTree.sh
21
syncTree.sh
|
@ -1,21 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
mkdir -p internal/frostfs/services/tree 2>/dev/null
|
|
||||||
|
|
||||||
REVISION="f07d4158f50ed5c7f44cc0bc224c3d03edf27f3b"
|
|
||||||
|
|
||||||
echo "tree service revision ${REVISION}"
|
|
||||||
|
|
||||||
# regexp below find all link to source code files which end with ".pb.go" and retrieve the file names
|
|
||||||
# we use `[^.]*` as non greedy workaround for `.*`
|
|
||||||
FILES=$(curl -s https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/${REVISION}/pkg/services/tree | sed -n "s,.*\"/TrueCloudLab/frostfs-node/src/commit/${REVISION}/pkg/services/tree/\([^.]*\.pb\.go\)\".*,\1,p")
|
|
||||||
|
|
||||||
for file in $FILES; do
|
|
||||||
if [[ $file == *"frostfs"* ]]; then
|
|
||||||
echo "skip '$file'"
|
|
||||||
continue
|
|
||||||
else
|
|
||||||
echo "sync '$file' in tree service"
|
|
||||||
fi
|
|
||||||
curl -s "https://git.frostfs.info/TrueCloudLab/frostfs-node/raw/commit/${REVISION}/pkg/services/tree/${file}" -o "./internal/frostfs/services/tree/${file}"
|
|
||||||
done
|
|
Loading…
Reference in a new issue