Reconnect gRPC servers #836

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/grpc_init_lazy into master 2024-09-04 19:51:04 +00:00
10 changed files with 280 additions and 84 deletions

View file

@ -2,12 +2,14 @@ package main
import (
"context"
"net"
accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting"
accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph"
"google.golang.org/grpc"
)
func initAccountingService(ctx context.Context, c *cfg) {
@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
accountingGRPC.RegisterAccountingServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
accountingGRPC.RegisterAccountingServiceServer(s, server)
})
}

View file

@ -426,11 +426,26 @@ type dynamicConfiguration struct {
metrics *httpComponent
}
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
appConfigGuard
// configuration of the internal
// services
@ -460,16 +475,79 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
return nil
}
type grpcServer struct {
Listener net.Listener
Server *grpc.Server
Endpoint string
}
type cfgGRPC struct {
listeners []net.Listener
// guard protects connections and handlers
guard sync.RWMutex
// servers must be protected with guard
servers []grpcServer
// handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
servers []*grpc.Server
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
}
endpoints []string
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
maxChunkSize uint64
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
}
maxAddrAmount uint64
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
for _, h := range c.handlers {
h(e, l, s)
}
}
fyrchik marked this conversation as resolved Outdated

Don't we need to cleanup all resources (it seems listener can be open, this will prevent us from using this IP on the next reconnection)?

Don't we need to cleanup all resources (it seems `listener` can be open, this will prevent us from using this IP on the next reconnection)?

Also c[:pos] + c[pos:] seems equal c, do we forget +-1 somewhere?

Also `c[:pos] + c[pos:]` seems equal `c`, do we forget +-1 somewhere?

Don't we need to cleanup all resources (it seems listener can be open, this will prevent us from using this IP on the next reconnection)?

grpc.Server.Serve will close listener:

func (s *Server) Serve(lis net.Listener) error {
...
	ls := &listenSocket{Listener: lis}
	s.lis[ls] = true

	defer func() {
		s.mu.Lock()
		if s.lis != nil && s.lis[ls] {
			ls.Close() <-- closes listener
			delete(s.lis, ls)
		}
		s.mu.Unlock()
	}()

But ok, added explicit Stop call.

> Don't we need to cleanup all resources (it seems `listener` can be open, this will prevent us from using this IP on the next reconnection)? `grpc.Server.Serve` will close listener: ``` func (s *Server) Serve(lis net.Listener) error { ... ls := &listenSocket{Listener: lis} s.lis[ls] = true defer func() { s.mu.Lock() if s.lis != nil && s.lis[ls] { ls.Close() <-- closes listener delete(s.lis, ls) } s.mu.Unlock() }() ``` But ok, added explicit `Stop` call.

Also c[:pos] + c[pos:] seems equal c, do we forget +-1 somewhere?

Fixed

> Also `c[:pos] + c[pos:]` seems equal `c`, do we forget +-1 somewhere? Fixed
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
c.guard.Lock()
defer c.guard.Unlock()
for _, conn := range c.servers {
handler(conn.Endpoint, conn.Listener, conn.Server)
}
c.handlers = append(c.handlers, handler)
}
func (c *cfgGRPC) dropConnection(endpoint string) {
c.guard.Lock()
defer c.guard.Unlock()
pos := -1
for idx, srv := range c.servers {
if srv.Endpoint == endpoint {
pos = idx
break
}
}
if pos < 0 {
return
}
c.servers[pos].Server.Stop() // closes listener
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
}
type cfgMorph struct {
@ -1142,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
}
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.readConfig(c.appCfg)
err := c.reloadAppConfig()
if err != nil {
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
return
@ -1209,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
@ -1232,7 +1317,7 @@ func (c *cfg) shutdown() {
}
c.ctxCancel()
c.done <- struct{}{}
close(c.done)
for i := range c.closers {
c.closers[len(c.closers)-1-i].fn()
}

View file

@ -3,6 +3,7 @@ package grpcconfig
import (
"errors"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
)
@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) {
panic("no gRPC server configured")
}
}
const DefaultReconnectInterval = time.Minute
// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter.
//
// Returns DefaultReconnectInterval if value is not defined or invalid.
func ReconnectTimeout(c *config.Config) time.Duration {
grpcConf := c.Sub("grpc")
ri := config.DurationSafe(grpcConf, "reconnect_interval")
if ri > 0 {
return ri
}
return DefaultReconnectInterval
}

View file

@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"net"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -16,6 +17,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
func initContainerService(_ context.Context, c *cfg) {
@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(s, server)
})
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {

View file

@ -3,7 +3,6 @@ package main
import (
"crypto/tls"
"errors"
"fmt"
"net"
"time"
@ -20,91 +19,169 @@ import (
const maxRecvMsgSize = 256 << 20
func initGRPC(c *cfg) {
var endpointsToReconnect []string
var successCount int
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor(
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
),
}
tlsCfg := sc.TLS()
if tlsCfg != nil {
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
return
}
var cipherSuites []uint16
if !tlsCfg.UseInsecureCrypto() {
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
// excluding:
// 1. TLS 1.3 suites need not be specified here.
// 2. Suites that use DH key exchange are not implemented by stdlib.
cipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
}
}
creds := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: cipherSuites,
Certificates: []tls.Certificate{cert},
})
serverOpts = append(serverOpts, grpc.Creds(creds))
serverOpts, ok := getGrpcServerOpts(c, sc)
if !ok {
return
}
lis, err := net.Listen("tcp", sc.Endpoint())
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint())
return
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint())
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
c.cfgGRPC.append(sc.Endpoint(), lis, srv)
successCount++
})
if successCount == 0 {
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
}
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
for _, endpoint := range endpointsToReconnect {
fyrchik marked this conversation as resolved Outdated

Why have you decided to use separate goroutines to handle each reconnection?

Why have you decided to use separate goroutines to handle each reconnection?

Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.

Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.
scheduleReconnect(endpoint, c)
}
}
func scheduleReconnect(endpoint string, c *cfg) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
for {
select {
case <-t.C:
if tryReconnect(endpoint, c) {
return
}

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?

Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?

Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.

Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.

Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?

If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?

> Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running? If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?

as I see SIGHUP handling now is not threadsafe in general

What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.

>as I see SIGHUP handling now is not threadsafe in general What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.
func initApp(ctx context.Context, c *cfg) {
	c.wg.Add(1)
	go func() {
		c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls
		c.wg.Done()
	}()

	initAndLog(c, ...)
	initAndLog(c, ...)
	initAndLog(c, ...)
	initAndLog(c, ...)
``` func initApp(ctx context.Context, c *cfg) { c.wg.Add(1) go func() { c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls c.wg.Done() }() initAndLog(c, ...) initAndLog(c, ...) initAndLog(c, ...) initAndLog(c, ...) ```

That's true (and it is a bug), but initApp is called only during startup, there is much less probability to receive SIGHUP here
With grpc servers constantly relistening we introduce the race under normal operation.

That's true (and it is a bug), but `initApp` is called only during startup, there is much less probability to receive SIGHUP here With grpc servers constantly relistening we introduce the race under normal operation.

If it looks like being hard to add now we can do it in #855, but IMO this is a high-priority task.

If it looks like being hard to add now we can do it in https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/855, but IMO this is a high-priority task.

Ok, fixed

Ok, fixed

We are execution reloadConfig only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all initAndLog calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of internals.healthStatus *atomic.Int32.

We are execution `reloadConfig` only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all `initAndLog` calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of `internals.healthStatus *atomic.Int32`.

Thanks, my statement about sighup threadsafe is incorrect.

But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP.

Health status CompareAndSwap strategy allows to run sighup OR one gRPC reconnect. It is too strict.

Mutex locks only appConfig reload and allows many goroutines to read it.

Thanks, my statement about sighup threadsafe is incorrect. But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP. Health status `CompareAndSwap` strategy allows to run sighup OR one gRPC reconnect. It is too strict. Mutex locks only appConfig reload and allows many goroutines to read it.
case <-c.done:
return
}
}
}()
}
func tryReconnect(endpoint string, c *cfg) bool {
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
if !found {
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
return true
}
lis, err := net.Listen("tcp", endpoint)
if err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
return false
}
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("FrostFS Public API", srv, c.log)
})
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
return true
}
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
unlock := c.LockAppConfigShared()
defer unlock()
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
if found {
return
}
if sc.Endpoint() != endpoint {
return
}
var ok bool
result, ok = getGrpcServerOpts(c, sc)
if !ok {
return
}
found = true
})
return
}
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ChainUnaryInterceptor(
metrics.NewUnaryServerInterceptor(),
tracing.NewUnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
metrics.NewStreamServerInterceptor(),
tracing.NewStreamServerInterceptor(),
),
}
tlsCfg := sc.TLS()
if tlsCfg != nil {
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
return nil, false
}
var cipherSuites []uint16
if !tlsCfg.UseInsecureCrypto() {
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
// excluding:
// 1. TLS 1.3 suites need not be specified here.
// 2. Suites that use DH key exchange are not implemented by stdlib.
cipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
}
}
creds := credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: cipherSuites,
Certificates: []tls.Certificate{cert},
})
serverOpts = append(serverOpts, grpc.Creds(creds))
}
return serverOpts, true
}
func serveGRPC(c *cfg) {
for i := range c.cfgGRPC.servers {
c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) {
c.wg.Add(1)
srv := c.cfgGRPC.servers[i]
lis := c.cfgGRPC.listeners[i]
endpoint := c.cfgGRPC.endpoints[i]
go func() {
defer func() {
c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint,
zap.Stringer("endpoint", lis.Addr()),
zap.Stringer("endpoint", l.Addr()),
)
c.wg.Done()
@ -112,15 +189,17 @@ func serveGRPC(c *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", "gRPC"),
zap.Stringer("endpoint", lis.Addr()),
zap.Stringer("endpoint", l.Addr()),
)
if err := srv.Serve(lis); err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
fmt.Println("gRPC server error", err)
if err := s.Serve(l); err != nil {
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e)
c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err))
c.cfgGRPC.dropConnection(e)
scheduleReconnect(e, c)
}
}()
}
})
}
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {

View file

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"net"
"sync/atomic"
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
@ -21,6 +22,7 @@ import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// primary solution of local network state dump.
@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
netmapGRPC.RegisterNetmapServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
netmapGRPC.RegisterNetmapServiceServer(s, server)
})
addNewEpochNotificationHandlers(c)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
@ -43,6 +44,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type objectSvc struct {
@ -204,9 +206,9 @@ func initObjectService(c *cfg) {
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
server := objectTransportGRPC.New(c.shared.metricsSvc)
for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
objectGRPC.RegisterObjectServiceServer(s, server)
})
}
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
@ -16,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"google.golang.org/grpc"
)
type sessionStorage interface {
@ -57,7 +59,7 @@ func initSessionService(c *cfg) {
),
)
for _, srv := range c.cfgGRPC.servers {
sessionGRPC.RegisterSessionServiceServer(srv, server)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
sessionGRPC.RegisterSessionServiceServer(s, server)
})
}

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"net"
"time"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
@ -15,6 +16,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type cnrSource struct {
@ -63,9 +65,9 @@ func initTreeService(c *cfg) {
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
tree.WithMetrics(c.metricsCollector.TreeService()))
for _, srv := range c.cfgGRPC.servers {
tree.RegisterTreeServiceServer(srv, c.treeService)
}
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
tree.RegisterTreeServiceServer(s, c.treeService)
})
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.treeService.Start(ctx)

View file

@ -423,6 +423,11 @@ const (
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"
FrostFSNodeGRPCServerError = "gRPC server error"
FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..."
FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully"
FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found"
FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server"
FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop"
FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance"
FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"