Compare commits

...

5 commits

Author SHA1 Message Date
2be1aa781d [#1266] .forgejo: Make 'fumpt' job fail on changed files
`gofumpt` always returns an exit code of 0, even when it finds
misformatted files. To make `fumpt` action behave as expected
we need to check if `gofumpt` changed any files.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-09-13 17:27:58 +03:00
89d0435b1d [#1374] tree: Use NewClient to create grpc connection in cache
Created grpc connection should be established, so perform Healthcheck request
to check connection is ok.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-13 15:59:26 +03:00
54fe8383a4 [#1374] tree: Use NewClient to create grpc connection for sync
Created connection will be used to sync trees, so it is ok to defer
dial to the first RPC call.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-13 15:59:26 +03:00
944160427b [#1374] cli: Drop deprecated grpc connection method
For `frostfs-cli` it is ok to use grpc-client without blocking,
as `frostfs-cli` will perform RPC call anyway.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-13 15:59:26 +03:00
bb44867491 [#1374] go.mod: Upgrade grpc to v1.66.2
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-13 15:59:00 +03:00
16 changed files with 78 additions and 48 deletions

View file

@ -106,4 +106,6 @@ jobs:
run: make fumpt-install
- name: Run gofumpt
run: make fumpt
run: |
make fumpt
git diff --exit-code --quiet

View file

@ -47,9 +47,10 @@ func add(cmd *cobra.Command, _ []string) {
meta, err := parseMeta(cmd)
commonCmd.ExitOnErr(cmd, "meta data parsing: %w", err)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -50,9 +50,10 @@ func addByPath(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -3,13 +3,14 @@ package tree
import (
"context"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -17,7 +18,7 @@ import (
// _client returns grpc Tree service client. Should be removed
// after making Tree API public.
func _client(ctx context.Context) (tree.TreeServiceClient, error) {
func _client() (tree.TreeServiceClient, error) {
var netAddr network.Address
err := netAddr.FromString(viper.GetString(commonflags.RPC))
if err != nil {
@ -25,7 +26,6 @@ func _client(ctx context.Context) (tree.TreeServiceClient, error) {
}
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
@ -40,12 +40,14 @@ func _client(ctx context.Context) (tree.TreeServiceClient, error) {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// a default connection establishing timeout
const defaultClientConnectTimeout = time.Second * 2
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...)
cancel()
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
return tree.NewTreeServiceClient(cc), err
}
func contextWithTimeout(cmd *cobra.Command) (context.Context, context.CancelFunc) {
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
return context.WithTimeout(cmd.Context(), timeout)
}
return context.WithTimeout(cmd.Context(), commonflags.TimeoutDefault)
}

View file

@ -50,9 +50,10 @@ func getByPath(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -44,9 +44,10 @@ func getOpLog(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
tid, _ := cmd.Flags().GetString(treeIDFlagKey)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -26,9 +26,10 @@ func initHealthcheckCmd() {
func healthcheck(cmd *cobra.Command, _ []string) {
pk := key.GetOrGenerate(cmd)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
req := &tree.HealthcheckRequest{

View file

@ -38,9 +38,10 @@ func list(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -45,9 +45,10 @@ func move(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -41,9 +41,10 @@ func remove(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

View file

@ -46,9 +46,10 @@ func getSubTree(cmd *cobra.Command, _ []string) {
err := cnr.DecodeString(cidString)
commonCmd.ExitOnErr(cmd, "decode container ID string: %w", err)
ctx := cmd.Context()
ctx, cancel := contextWithTimeout(cmd)
defer cancel()
cli, err := _client(ctx)
cli, err := _client()
commonCmd.ExitOnErr(cmd, "failed to create client: %w", err)
rawCID := make([]byte, sha256.Size)

14
go.mod
View file

@ -46,9 +46,9 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
golang.org/x/term v0.18.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
golang.org/x/term v0.21.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.1
gopkg.in/yaml.v3 v3.0.1
)
@ -122,11 +122,11 @@ require (
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -2,6 +2,7 @@ package tree
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
@ -19,6 +20,7 @@ import (
type clientCache struct {
sync.Mutex
simplelru.LRU[string, cacheItem]
key *ecdsa.PrivateKey
}
type cacheItem struct {
@ -34,13 +36,14 @@ const (
var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() {
func (c *clientCache) init(pk *ecdsa.PrivateKey) {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
if conn := value.cc; conn != nil {
_ = conn.Close()
}
})
c.LRU = *l
c.key = pk
}
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
@ -63,7 +66,7 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
}
}
cc, err := dialTreeService(ctx, netmapAddr)
cc, err := c.dialTreeService(ctx, netmapAddr)
lastTry := time.Now()
c.Lock()
@ -81,14 +84,13 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
return NewTreeServiceClient(cc), nil
}
func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
var netAddr network.Address
if err := netAddr.FromString(netmapAddr); err != nil {
return nil, err
}
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(),
@ -103,9 +105,24 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn,
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...)
cancel()
req := &HealthcheckRequest{
Body: &HealthcheckRequest_Body{},
}
if err := SignMessage(req, c.key); err != nil {
return nil, err
}
return cc, err
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
defer cancel()
// perform some request to check connection
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
_ = cc.Close()
return nil, err
}
return cc, nil
}

View file

@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
s.log = &logger.Logger{Logger: zap.NewNop()}
}
s.cache.init()
s.cache.init(s.key)
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)

View file

@ -294,7 +294,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return false
}
cc, err := s.dialCtx(egCtx, a)
cc, err := s.createConnection(a)
if err != nil {
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
@ -332,8 +332,8 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return from
}
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
return grpc.DialContext(egCtx, a.URIAddr(),
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
return grpc.NewClient(a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),