Reconnect gRPC servers #836
|
@ -2,12 +2,14 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
|
||||
accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
|
||||
accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting"
|
||||
accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func initAccountingService(ctx context.Context, c *cfg) {
|
||||
|
@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
accountingGRPC.RegisterAccountingServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
accountingGRPC.RegisterAccountingServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -426,11 +426,26 @@ type dynamicConfiguration struct {
|
|||
metrics *httpComponent
|
||||
}
|
||||
|
||||
type appConfigGuard struct {
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigShared() func() {
|
||||
g.mtx.RLock()
|
||||
return func() { g.mtx.RUnlock() }
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
||||
g.mtx.Lock()
|
||||
return func() { g.mtx.Unlock() }
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
applicationConfiguration
|
||||
internals
|
||||
shared
|
||||
dynamicConfiguration
|
||||
appConfigGuard
|
||||
|
||||
// configuration of the internal
|
||||
// services
|
||||
|
@ -460,16 +475,79 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type grpcServer struct {
|
||||
Listener net.Listener
|
||||
Server *grpc.Server
|
||||
Endpoint string
|
||||
}
|
||||
|
||||
type cfgGRPC struct {
|
||||
listeners []net.Listener
|
||||
// guard protects connections and handlers
|
||||
guard sync.RWMutex
|
||||
// servers must be protected with guard
|
||||
servers []grpcServer
|
||||
// handlers must be protected with guard
|
||||
handlers []func(e string, l net.Listener, s *grpc.Server)
|
||||
|
||||
servers []*grpc.Server
|
||||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
}
|
||||
|
||||
endpoints []string
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
maxChunkSize uint64
|
||||
c.servers = append(c.servers, grpcServer{
|
||||
Listener: l,
|
||||
Server: s,
|
||||
Endpoint: e,
|
||||
})
|
||||
}
|
||||
|
||||
maxAddrAmount uint64
|
||||
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
c.servers = append(c.servers, grpcServer{
|
||||
Listener: l,
|
||||
Server: s,
|
||||
Endpoint: e,
|
||||
})
|
||||
|
||||
for _, h := range c.handlers {
|
||||
h(e, l, s)
|
||||
}
|
||||
}
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
for _, conn := range c.servers {
|
||||
handler(conn.Endpoint, conn.Listener, conn.Server)
|
||||
}
|
||||
|
||||
c.handlers = append(c.handlers, handler)
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) dropConnection(endpoint string) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
pos := -1
|
||||
for idx, srv := range c.servers {
|
||||
if srv.Endpoint == endpoint {
|
||||
pos = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
if pos < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
c.servers[pos].Server.Stop() // closes listener
|
||||
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
|
||||
}
|
||||
|
||||
type cfgMorph struct {
|
||||
|
@ -1142,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
err := c.reloadAppConfig()
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
||||
return
|
||||
|
@ -1209,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
func (c *cfg) reloadAppConfig() error {
|
||||
unlock := c.LockAppConfigExclusive()
|
||||
defer unlock()
|
||||
|
||||
return c.readConfig(c.appCfg)
|
||||
}
|
||||
|
||||
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||
var tssPrm tsourse.TombstoneSourcePrm
|
||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||
|
@ -1232,7 +1317,7 @@ func (c *cfg) shutdown() {
|
|||
}
|
||||
|
||||
c.ctxCancel()
|
||||
c.done <- struct{}{}
|
||||
close(c.done)
|
||||
for i := range c.closers {
|
||||
c.closers[len(c.closers)-1-i].fn()
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package grpcconfig
|
|||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) {
|
|||
panic("no gRPC server configured")
|
||||
}
|
||||
}
|
||||
|
||||
const DefaultReconnectInterval = time.Minute
|
||||
|
||||
// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter.
|
||||
//
|
||||
// Returns DefaultReconnectInterval if value is not defined or invalid.
|
||||
func ReconnectTimeout(c *config.Config) time.Duration {
|
||||
grpcConf := c.Sub("grpc")
|
||||
ri := config.DurationSafe(grpcConf, "reconnect_interval")
|
||||
if ri > 0 {
|
||||
return ri
|
||||
}
|
||||
return DefaultReconnectInterval
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
|
||||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func initContainerService(_ context.Context, c *cfg) {
|
||||
|
@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
containerGRPC.RegisterContainerServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
containerGRPC.RegisterContainerServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
||||
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||
|
|
|
@ -3,7 +3,6 @@ package main
|
|||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
|
@ -20,91 +19,169 @@ import (
|
|||
const maxRecvMsgSize = 256 << 20
|
||||
|
||||
func initGRPC(c *cfg) {
|
||||
var endpointsToReconnect []string
|
||||
var successCount int
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
}
|
||||
|
||||
tlsCfg := sc.TLS()
|
||||
|
||||
if tlsCfg != nil {
|
||||
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
var cipherSuites []uint16
|
||||
if !tlsCfg.UseInsecureCrypto() {
|
||||
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
|
||||
// excluding:
|
||||
// 1. TLS 1.3 suites need not be specified here.
|
||||
// 2. Suites that use DH key exchange are not implemented by stdlib.
|
||||
cipherSuites = []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
}
|
||||
}
|
||||
creds := credentials.NewTLS(&tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
CipherSuites: cipherSuites,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
})
|
||||
|
||||
serverOpts = append(serverOpts, grpc.Creds(creds))
|
||||
serverOpts, ok := getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", sc.Endpoint())
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint())
|
||||
return
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
|
||||
|
||||
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
|
||||
c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint())
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
|
||||
c.cfgGRPC.append(sc.Endpoint(), lis, srv)
|
||||
successCount++
|
||||
})
|
||||
|
||||
if successCount == 0 {
|
||||
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
|
||||
}
|
||||
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
|
||||
for _, endpoint := range endpointsToReconnect {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why have you decided to use separate goroutines to handle each reconnection? Why have you decided to use separate goroutines to handle each reconnection?
dstepanov-yadro
commented
Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others. Each address tries to reconnect independently so that waiting for one address does not affect the speed of reconnecting to the others.
|
||||
scheduleReconnect(endpoint, c)
|
||||
}
|
||||
}
|
||||
|
||||
func scheduleReconnect(endpoint string, c *cfg) {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
|
||||
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if tryReconnect(endpoint, c) {
|
||||
return
|
||||
}
|
||||
fyrchik
commented
Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running? Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?
fyrchik
commented
Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs? Also, this can be executed concurrently to SIGHUP, how can we easily see no data-race occurs?
dstepanov-yadro
commented
Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general. Well, I used ostrich tactics in this case: as I see SIGHUP handling now is not threadsafe in general.
dstepanov-yadro
commented
If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question? > Will there be a leak if we remove the endpoint from the configuration and send SIGHUP while this goroutine is running?
If there is no current endpoint after sighup, then the reconnect goroutine will return. Or did I misunderstand the question?
fyrchik
commented
What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe. >as I see SIGHUP handling now is not threadsafe in general
What do you mean? Do we already have places like this (non-threadsafe)? e.g. for engine it is thread safe.
dstepanov-yadro
commented
```
func initApp(ctx context.Context, c *cfg) {
c.wg.Add(1)
go func() {
c.signalWatcher(ctx) <-- here app starts to accept signals and it is possible that config will be updated concurrently with initAndLog calls
c.wg.Done()
}()
initAndLog(c, ...)
initAndLog(c, ...)
initAndLog(c, ...)
initAndLog(c, ...)
```
fyrchik
commented
That's true (and it is a bug), but That's true (and it is a bug), but `initApp` is called only during startup, there is much less probability to receive SIGHUP here
With grpc servers constantly relistening we introduce the race under normal operation.
fyrchik
commented
If it looks like being hard to add now we can do it in #855, but IMO this is a high-priority task. If it looks like being hard to add now we can do it in https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/855, but IMO this is a high-priority task.
dstepanov-yadro
commented
Ok, fixed Ok, fixed
acid-ant
commented
We are execution We are execution `reloadConfig` only for state control.HealthStatus_READY, in other cases we are skipping reloading. Status will be ready only when all `initAndLog` calls will be compleated. Also, we have special status HealthStatus_RECONFIGURING. So if we need to reload grpc, at the first step we need to stop connection and wait for completion of the all background tasks. Looks like mutex here is redundant. This functionality already implemented on top of `internals.healthStatus *atomic.Int32`.
dstepanov-yadro
commented
Thanks, my statement about sighup threadsafe is incorrect. But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP. Health status Mutex locks only appConfig reload and allows many goroutines to read it. Thanks, my statement about sighup threadsafe is incorrect.
But the situation when node recreates gRPC server is not RECONFIGURING: node is healthy. Also gRPC reconnect should not to wait full SIGHUP.
Health status `CompareAndSwap` strategy allows to run sighup OR one gRPC reconnect. It is too strict.
Mutex locks only appConfig reload and allows many goroutines to read it.
|
||||
case <-c.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func tryReconnect(endpoint string, c *cfg) bool {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
|
||||
|
||||
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
|
||||
return false
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
|
||||
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
|
||||
unlock := c.LockAppConfigShared()
|
||||
defer unlock()
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
}
|
||||
var ok bool
|
||||
result, ok = getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
found = true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
}
|
||||
|
||||
tlsCfg := sc.TLS()
|
||||
|
||||
if tlsCfg != nil {
|
||||
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
|
||||
return nil, false
|
||||
}
|
||||
|
||||
var cipherSuites []uint16
|
||||
if !tlsCfg.UseInsecureCrypto() {
|
||||
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
|
||||
// excluding:
|
||||
// 1. TLS 1.3 suites need not be specified here.
|
||||
// 2. Suites that use DH key exchange are not implemented by stdlib.
|
||||
cipherSuites = []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
}
|
||||
}
|
||||
creds := credentials.NewTLS(&tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
CipherSuites: cipherSuites,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
})
|
||||
|
||||
serverOpts = append(serverOpts, grpc.Creds(creds))
|
||||
}
|
||||
|
||||
return serverOpts, true
|
||||
}
|
||||
|
||||
func serveGRPC(c *cfg) {
|
||||
for i := range c.cfgGRPC.servers {
|
||||
c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) {
|
||||
c.wg.Add(1)
|
||||
|
||||
srv := c.cfgGRPC.servers[i]
|
||||
lis := c.cfgGRPC.listeners[i]
|
||||
endpoint := c.cfgGRPC.endpoints[i]
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint,
|
||||
zap.Stringer("endpoint", lis.Addr()),
|
||||
zap.Stringer("endpoint", l.Addr()),
|
||||
)
|
||||
|
||||
c.wg.Done()
|
||||
|
@ -112,15 +189,17 @@ func serveGRPC(c *cfg) {
|
|||
|
||||
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
|
||||
zap.String("service", "gRPC"),
|
||||
zap.Stringer("endpoint", lis.Addr()),
|
||||
zap.Stringer("endpoint", l.Addr()),
|
||||
)
|
||||
|
||||
if err := srv.Serve(lis); err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
fmt.Println("gRPC server error", err)
|
||||
if err := s.Serve(l); err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e)
|
||||
c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err))
|
||||
c.cfgGRPC.dropConnection(e)
|
||||
scheduleReconnect(e, c)
|
||||
}
|
||||
}()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
|
||||
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// primary solution of local network state dump.
|
||||
|
@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
netmapGRPC.RegisterNetmapServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
netmapGRPC.RegisterNetmapServiceServer(s, server)
|
||||
})
|
||||
|
||||
addNewEpochNotificationHandlers(c)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
||||
|
@ -43,6 +44,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type objectSvc struct {
|
||||
|
@ -204,9 +206,9 @@ func initObjectService(c *cfg) {
|
|||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
objectGRPC.RegisterObjectServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
objectGRPC.RegisterObjectServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
||||
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type sessionStorage interface {
|
||||
|
@ -57,7 +59,7 @@ func initSessionService(c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
sessionGRPC.RegisterSessionServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
sessionGRPC.RegisterSessionServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type cnrSource struct {
|
||||
|
@ -63,9 +65,9 @@ func initTreeService(c *cfg) {
|
|||
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
||||
tree.WithMetrics(c.metricsCollector.TreeService()))
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
tree.RegisterTreeServiceServer(srv, c.treeService)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
c.treeService.Start(ctx)
|
||||
|
|
|
@ -423,6 +423,11 @@ const (
|
|||
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
|
||||
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
|
||||
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"
|
||||
FrostFSNodeGRPCServerError = "gRPC server error"
|
||||
FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..."
|
||||
FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully"
|
||||
FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found"
|
||||
FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server"
|
||||
FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop"
|
||||
FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance"
|
||||
FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"
|
||||
|
|
Don't we need to cleanup all resources (it seems
listener
can be open, this will prevent us from using this IP on the next reconnection)?Also
c[:pos] + c[pos:]
seems equalc
, do we forget +-1 somewhere?grpc.Server.Serve
will close listener:But ok, added explicit
Stop
call.Fixed