forked from TrueCloudLab/frostfs-node
Compare commits
5 commits
0a93738042
...
2be1aa781d
Author | SHA1 | Date | |
---|---|---|---|
2be1aa781d | |||
89d0435b1d | |||
54fe8383a4 | |||
944160427b | |||
bb44867491 |
16 changed files with 78 additions and 48 deletions
|
@ -106,4 +106,6 @@ jobs:
|
|||
run: make fumpt-install
|
||||
|
||||
- name: Run gofumpt
|
||||
run: make fumpt
|
||||
run: |
|
||||
make fumpt
|
||||
git diff --exit-code --quiet
|
||||
|
|
|
@ -47,9 +47,10 @@ func add(cmd *cobra.Command, _ []string) {
|
|||
meta, err := parseMeta(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "meta data parsing: %w", err)
|
||||
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -50,9 +50,10 @@ func addByPath(cmd *cobra.Command, _ []string) {
|
|||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -3,13 +3,14 @@ package tree
|
|||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
|
||||
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
@ -17,7 +18,7 @@ import (
|
|||
|
||||
// _client returns grpc Tree service client. Should be removed
|
||||
// after making Tree API public.
|
||||
func _client(ctx context.Context) (tree.TreeServiceClient, error) {
|
||||
func _client() (tree.TreeServiceClient, error) {
|
||||
var netAddr network.Address
|
||||
err := netAddr.FromString(viper.GetString(commonflags.RPC))
|
||||
if err != nil {
|
||||
|
@ -25,7 +26,6 @@ func _client(ctx context.Context) (tree.TreeServiceClient, error) {
|
|||
}
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithBlock(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
|
@ -40,12 +40,14 @@ func _client(ctx context.Context) (tree.TreeServiceClient, error) {
|
|||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
}
|
||||
|
||||
// a default connection establishing timeout
|
||||
const defaultClientConnectTimeout = time.Second * 2
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
||||
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...)
|
||||
cancel()
|
||||
|
||||
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
|
||||
return tree.NewTreeServiceClient(cc), err
|
||||
}
|
||||
|
||||
func contextWithTimeout(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
||||
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
||||
common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
|
||||
return context.WithTimeout(cmd.Context(), timeout)
|
||||
}
|
||||
return context.WithTimeout(cmd.Context(), commonflags.TimeoutDefault)
|
||||
}
|
||||
|
|
|
@ -50,9 +50,10 @@ func getByPath(cmd *cobra.Command, _ []string) {
|
|||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -44,9 +44,10 @@ func getOpLog(cmd *cobra.Command, _ []string) {
|
|||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -26,9 +26,10 @@ func initHealthcheckCmd() {
|
|||
|
||||
func healthcheck(cmd *cobra.Command, _ []string) {
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
req := &tree.HealthcheckRequest{
|
||||
|
|
|
@ -38,9 +38,10 @@ func list(cmd *cobra.Command, _ []string) {
|
|||
err := cnr.DecodeString(cidString)
|
||||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -45,9 +45,10 @@ func move(cmd *cobra.Command, _ []string) {
|
|||
err := cnr.DecodeString(cidString)
|
||||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -41,9 +41,10 @@ func remove(cmd *cobra.Command, _ []string) {
|
|||
err := cnr.DecodeString(cidString)
|
||||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
|
@ -46,9 +46,10 @@ func getSubTree(cmd *cobra.Command, _ []string) {
|
|||
err := cnr.DecodeString(cidString)
|
||||
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
|
||||
|
||||
ctx := cmd.Context()
|
||||
ctx, cancel := contextWithTimeout(cmd)
|
||||
defer cancel()
|
||||
|
||||
cli, err := _client(ctx)
|
||||
cli, err := _client()
|
||||
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
|
||||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
|
|
14
go.mod
14
go.mod
|
@ -46,9 +46,9 @@ require (
|
|||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
|
||||
golang.org/x/sync v0.7.0
|
||||
golang.org/x/sys v0.22.0
|
||||
golang.org/x/term v0.18.0
|
||||
google.golang.org/grpc v1.63.2
|
||||
google.golang.org/protobuf v1.33.0
|
||||
golang.org/x/term v0.21.0
|
||||
google.golang.org/grpc v1.66.2
|
||||
google.golang.org/protobuf v1.34.1
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
|
@ -122,11 +122,11 @@ require (
|
|||
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.21.0 // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/crypto v0.24.0 // indirect
|
||||
golang.org/x/net v0.26.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
rsc.io/tmplfunc v0.0.3 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -2,6 +2,7 @@ package tree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
type clientCache struct {
|
||||
sync.Mutex
|
||||
simplelru.LRU[string, cacheItem]
|
||||
key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
|
@ -34,13 +36,14 @@ const (
|
|||
|
||||
var errRecentlyFailed = errors.New("client has recently failed")
|
||||
|
||||
func (c *clientCache) init() {
|
||||
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
|
||||
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||
if conn := value.cc; conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
})
|
||||
c.LRU = *l
|
||||
c.key = pk
|
||||
}
|
||||
|
||||
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
||||
|
@ -63,7 +66,7 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
|
|||
}
|
||||
}
|
||||
|
||||
cc, err := dialTreeService(ctx, netmapAddr)
|
||||
cc, err := c.dialTreeService(ctx, netmapAddr)
|
||||
lastTry := time.Now()
|
||||
|
||||
c.Lock()
|
||||
|
@ -81,14 +84,13 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
|
|||
return NewTreeServiceClient(cc), nil
|
||||
}
|
||||
|
||||
func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
|
||||
func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
|
||||
var netAddr network.Address
|
||||
if err := netAddr.FromString(netmapAddr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithBlock(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing.NewUnaryClientInteceptor(),
|
||||
|
@ -103,9 +105,24 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn,
|
|||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
||||
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...)
|
||||
cancel()
|
||||
req := &HealthcheckRequest{
|
||||
Body: &HealthcheckRequest_Body{},
|
||||
}
|
||||
if err := SignMessage(req, c.key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cc, err
|
||||
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
||||
defer cancel()
|
||||
// perform some request to check connection
|
||||
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
|
||||
_ = cc.Close()
|
||||
return nil, err
|
||||
}
|
||||
return cc, nil
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
|
|||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||
}
|
||||
|
||||
s.cache.init()
|
||||
s.cache.init(s.key)
|
||||
s.closeCh = make(chan struct{})
|
||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||
s.replicateLocalCh = make(chan applyOp)
|
||||
|
|
|
@ -294,7 +294,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
return false
|
||||
}
|
||||
|
||||
cc, err := s.dialCtx(egCtx, a)
|
||||
cc, err := s.createConnection(a)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||
return false
|
||||
|
@ -332,8 +332,8 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
return from
|
||||
}
|
||||
|
||||
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
|
||||
return grpc.DialContext(egCtx, a.URIAddr(),
|
||||
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||
return grpc.NewClient(a.URIAddr(),
|
||||
grpc.WithChainUnaryInterceptor(
|
||||
metrics.NewUnaryClientInterceptor(),
|
||||
tracing_grpc.NewUnaryClientInteceptor(),
|
||||
|
|
Loading…
Reference in a new issue