Reconnect gRPC servers #836
10 changed files with 280 additions and 84 deletions
|
@ -2,12 +2,14 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
|
||||
accountingGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
|
||||
accountingService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting"
|
||||
accounting "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/accounting/morph"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func initAccountingService(ctx context.Context, c *cfg) {
|
||||
|
@ -28,7 +30,7 @@ func initAccountingService(ctx context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
accountingGRPC.RegisterAccountingServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
accountingGRPC.RegisterAccountingServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -426,11 +426,26 @@ type dynamicConfiguration struct {
|
|||
metrics *httpComponent
|
||||
}
|
||||
|
||||
type appConfigGuard struct {
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigShared() func() {
|
||||
g.mtx.RLock()
|
||||
return func() { g.mtx.RUnlock() }
|
||||
}
|
||||
|
||||
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
||||
g.mtx.Lock()
|
||||
return func() { g.mtx.Unlock() }
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
applicationConfiguration
|
||||
internals
|
||||
shared
|
||||
dynamicConfiguration
|
||||
appConfigGuard
|
||||
|
||||
// configuration of the internal
|
||||
// services
|
||||
|
@ -460,16 +475,79 @@ func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type grpcServer struct {
|
||||
Listener net.Listener
|
||||
Server *grpc.Server
|
||||
Endpoint string
|
||||
}
|
||||
|
||||
type cfgGRPC struct {
|
||||
listeners []net.Listener
|
||||
// guard protects connections and handlers
|
||||
guard sync.RWMutex
|
||||
// servers must be protected with guard
|
||||
servers []grpcServer
|
||||
// handlers must be protected with guard
|
||||
handlers []func(e string, l net.Listener, s *grpc.Server)
|
||||
|
||||
servers []*grpc.Server
|
||||
maxChunkSize uint64
|
||||
maxAddrAmount uint64
|
||||
reconnectTimeout time.Duration
|
||||
}
|
||||
|
||||
endpoints []string
|
||||
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
maxChunkSize uint64
|
||||
c.servers = append(c.servers, grpcServer{
|
||||
Listener: l,
|
||||
Server: s,
|
||||
Endpoint: e,
|
||||
})
|
||||
}
|
||||
|
||||
maxAddrAmount uint64
|
||||
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
c.servers = append(c.servers, grpcServer{
|
||||
Listener: l,
|
||||
Server: s,
|
||||
Endpoint: e,
|
||||
})
|
||||
|
||||
for _, h := range c.handlers {
|
||||
h(e, l, s)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
for _, conn := range c.servers {
|
||||
handler(conn.Endpoint, conn.Listener, conn.Server)
|
||||
}
|
||||
|
||||
c.handlers = append(c.handlers, handler)
|
||||
}
|
||||
|
||||
func (c *cfgGRPC) dropConnection(endpoint string) {
|
||||
c.guard.Lock()
|
||||
defer c.guard.Unlock()
|
||||
|
||||
pos := -1
|
||||
for idx, srv := range c.servers {
|
||||
if srv.Endpoint == endpoint {
|
||||
pos = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
if pos < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
c.servers[pos].Server.Stop() // closes listener
|
||||
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
|
||||
}
|
||||
|
||||
type cfgMorph struct {
|
||||
|
@ -1142,7 +1220,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
}
|
||||
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
||||
|
||||
err := c.readConfig(c.appCfg)
|
||||
err := c.reloadAppConfig()
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
||||
return
|
||||
|
@ -1209,6 +1287,13 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||
}
|
||||
|
||||
func (c *cfg) reloadAppConfig() error {
|
||||
unlock := c.LockAppConfigExclusive()
|
||||
defer unlock()
|
||||
|
||||
return c.readConfig(c.appCfg)
|
||||
}
|
||||
|
||||
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||
var tssPrm tsourse.TombstoneSourcePrm
|
||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||
|
@ -1232,7 +1317,7 @@ func (c *cfg) shutdown() {
|
|||
}
|
||||
|
||||
c.ctxCancel()
|
||||
c.done <- struct{}{}
|
||||
close(c.done)
|
||||
for i := range c.closers {
|
||||
c.closers[len(c.closers)-1-i].fn()
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package grpcconfig
|
|||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
)
|
||||
|
@ -109,3 +110,17 @@ func IterateEndpoints(c *config.Config, f func(*Config)) {
|
|||
panic("no gRPC server configured")
|
||||
}
|
||||
}
|
||||
|
||||
const DefaultReconnectInterval = time.Minute
|
||||
|
||||
// ReconnectTimeout returns the value of "reconnect_interval" gRPC config parameter.
|
||||
//
|
||||
// Returns DefaultReconnectInterval if value is not defined or invalid.
|
||||
func ReconnectTimeout(c *config.Config) time.Duration {
|
||||
grpcConf := c.Sub("grpc")
|
||||
ri := config.DurationSafe(grpcConf, "reconnect_interval")
|
||||
if ri > 0 {
|
||||
return ri
|
||||
}
|
||||
return DefaultReconnectInterval
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
|
||||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func initContainerService(_ context.Context, c *cfg) {
|
||||
|
@ -37,9 +39,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
containerGRPC.RegisterContainerServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
containerGRPC.RegisterContainerServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
||||
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||
|
|
|
@ -3,7 +3,6 @@ package main
|
|||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
|
@ -20,91 +19,169 @@ import (
|
|||
const maxRecvMsgSize = 256 << 20
|
||||
|
||||
func initGRPC(c *cfg) {
|
||||
var endpointsToReconnect []string
|
||||
var successCount int
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
}
|
||||
|
||||
tlsCfg := sc.TLS()
|
||||
|
||||
if tlsCfg != nil {
|
||||
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
var cipherSuites []uint16
|
||||
if !tlsCfg.UseInsecureCrypto() {
|
||||
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
|
||||
// excluding:
|
||||
// 1. TLS 1.3 suites need not be specified here.
|
||||
// 2. Suites that use DH key exchange are not implemented by stdlib.
|
||||
cipherSuites = []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
}
|
||||
}
|
||||
creds := credentials.NewTLS(&tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
CipherSuites: cipherSuites,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
})
|
||||
|
||||
serverOpts = append(serverOpts, grpc.Creds(creds))
|
||||
serverOpts, ok := getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", sc.Endpoint())
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(sc.Endpoint())
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
endpointsToReconnect = append(endpointsToReconnect, sc.Endpoint())
|
||||
return
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(sc.Endpoint())
|
||||
|
||||
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
|
||||
c.cfgGRPC.endpoints = append(c.cfgGRPC.endpoints, sc.Endpoint())
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
|
||||
c.cfgGRPC.append(sc.Endpoint(), lis, srv)
|
||||
successCount++
|
||||
})
|
||||
|
||||
if successCount == 0 {
|
||||
fatalOnErr(errors.New("could not listen to any gRPC endpoints"))
|
||||
}
|
||||
c.cfgGRPC.reconnectTimeout = grpcconfig.ReconnectTimeout(c.appCfg)
|
||||
|
||||
for _, endpoint := range endpointsToReconnect {
|
||||
scheduleReconnect(endpoint, c)
|
||||
}
|
||||
}
|
||||
|
||||
func scheduleReconnect(endpoint string, c *cfg) {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
|
||||
t := time.NewTicker(c.cfgGRPC.reconnectTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if tryReconnect(endpoint, c) {
|
||||
return
|
||||
}
|
||||
case <-c.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func tryReconnect(endpoint string, c *cfg) bool {
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnecting, zap.String("endpoint", endpoint))
|
||||
|
||||
serverOpts, found := getGRPCEndpointOpts(endpoint, c)
|
||||
if !found {
|
||||
c.log.Warn(logs.FrostFSNodeGRPCServerConfigNotFound, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpoint, zap.Error(err))
|
||||
c.log.Warn(logs.FrostFSNodeGRPCReconnectFailed, zap.Duration("next_try_in", c.cfgGRPC.reconnectTimeout))
|
||||
return false
|
||||
}
|
||||
c.metricsCollector.GrpcServerMetrics().MarkHealthy(endpoint)
|
||||
|
||||
srv := grpc.NewServer(serverOpts...)
|
||||
|
||||
c.onShutdown(func() {
|
||||
stopGRPC("FrostFS Public API", srv, c.log)
|
||||
})
|
||||
|
||||
c.cfgGRPC.appendAndHandle(endpoint, lis, srv)
|
||||
|
||||
c.log.Info(logs.FrostFSNodeGRPCReconnectedSuccessfully, zap.String("endpoint", endpoint))
|
||||
return true
|
||||
}
|
||||
|
||||
func getGRPCEndpointOpts(endpoint string, c *cfg) (result []grpc.ServerOption, found bool) {
|
||||
unlock := c.LockAppConfigShared()
|
||||
defer unlock()
|
||||
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
if sc.Endpoint() != endpoint {
|
||||
return
|
||||
}
|
||||
var ok bool
|
||||
result, ok = getGrpcServerOpts(c, sc)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
found = true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func getGrpcServerOpts(c *cfg, sc *grpcconfig.Config) ([]grpc.ServerOption, bool) {
|
||||
serverOpts := []grpc.ServerOption{
|
||||
grpc.MaxRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.ChainUnaryInterceptor(
|
||||
metrics.NewUnaryServerInterceptor(),
|
||||
tracing.NewUnaryServerInterceptor(),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
metrics.NewStreamServerInterceptor(),
|
||||
tracing.NewStreamServerInterceptor(),
|
||||
),
|
||||
}
|
||||
|
||||
tlsCfg := sc.TLS()
|
||||
|
||||
if tlsCfg != nil {
|
||||
cert, err := tls.LoadX509KeyPair(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeCouldNotReadCertificateFromFile, zap.Error(err))
|
||||
return nil, false
|
||||
}
|
||||
|
||||
var cipherSuites []uint16
|
||||
if !tlsCfg.UseInsecureCrypto() {
|
||||
// This more or less follows the list in https://wiki.mozilla.org/Security/Server_Side_TLS
|
||||
// excluding:
|
||||
// 1. TLS 1.3 suites need not be specified here.
|
||||
// 2. Suites that use DH key exchange are not implemented by stdlib.
|
||||
cipherSuites = []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
|
||||
}
|
||||
}
|
||||
creds := credentials.NewTLS(&tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
CipherSuites: cipherSuites,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
})
|
||||
|
||||
serverOpts = append(serverOpts, grpc.Creds(creds))
|
||||
}
|
||||
|
||||
return serverOpts, true
|
||||
}
|
||||
|
||||
func serveGRPC(c *cfg) {
|
||||
for i := range c.cfgGRPC.servers {
|
||||
c.cfgGRPC.performAndSave(func(e string, l net.Listener, s *grpc.Server) {
|
||||
c.wg.Add(1)
|
||||
|
||||
srv := c.cfgGRPC.servers[i]
|
||||
lis := c.cfgGRPC.listeners[i]
|
||||
endpoint := c.cfgGRPC.endpoints[i]
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
c.log.Info(logs.FrostFSNodeStopListeningGRPCEndpoint,
|
||||
zap.Stringer("endpoint", lis.Addr()),
|
||||
zap.Stringer("endpoint", l.Addr()),
|
||||
)
|
||||
|
||||
c.wg.Done()
|
||||
|
@ -112,15 +189,17 @@ func serveGRPC(c *cfg) {
|
|||
|
||||
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
|
||||
zap.String("service", "gRPC"),
|
||||
zap.Stringer("endpoint", lis.Addr()),
|
||||
zap.Stringer("endpoint", l.Addr()),
|
||||
)
|
||||
|
||||
if err := srv.Serve(lis); err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(endpoint)
|
||||
fmt.Println("gRPC server error", err)
|
||||
if err := s.Serve(l); err != nil {
|
||||
c.metricsCollector.GrpcServerMetrics().MarkUnhealthy(e)
|
||||
c.log.Error(logs.FrostFSNodeGRPCServerError, zap.Error(err))
|
||||
c.cfgGRPC.dropConnection(e)
|
||||
scheduleReconnect(e, c)
|
||||
}
|
||||
}()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
|
||||
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// primary solution of local network state dump.
|
||||
|
@ -162,9 +164,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
netmapGRPC.RegisterNetmapServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
netmapGRPC.RegisterNetmapServiceServer(s, server)
|
||||
})
|
||||
|
||||
addNewEpochNotificationHandlers(c)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
||||
|
@ -43,6 +44,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type objectSvc struct {
|
||||
|
@ -204,9 +206,9 @@ func initObjectService(c *cfg) {
|
|||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
objectGRPC.RegisterObjectServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
objectGRPC.RegisterObjectServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
||||
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/persistent"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/session/storage/temporary"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type sessionStorage interface {
|
||||
|
@ -57,7 +59,7 @@ func initSessionService(c *cfg) {
|
|||
),
|
||||
)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
sessionGRPC.RegisterSessionServiceServer(srv, server)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
sessionGRPC.RegisterSessionServiceServer(s, server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type cnrSource struct {
|
||||
|
@ -63,9 +65,9 @@ func initTreeService(c *cfg) {
|
|||
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
||||
tree.WithMetrics(c.metricsCollector.TreeService()))
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
tree.RegisterTreeServiceServer(srv, c.treeService)
|
||||
}
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
c.treeService.Start(ctx)
|
||||
|
|
|
@ -423,6 +423,11 @@ const (
|
|||
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
|
||||
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
|
||||
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"
|
||||
FrostFSNodeGRPCServerError = "gRPC server error"
|
||||
FrostFSNodeGRPCReconnecting = "reconnecting gRPC server..."
|
||||
FrostFSNodeGRPCReconnectedSuccessfully = "gRPC server reconnected successfully"
|
||||
FrostFSNodeGRPCServerConfigNotFound = "gRPC server config not found"
|
||||
FrostFSNodeGRPCReconnectFailed = "failed to reconnect gRPC server"
|
||||
FrostFSNodeWaitingForAllProcessesToStop = "waiting for all processes to stop"
|
||||
FrostFSNodeStartedLocalNodesMaintenance = "started local node's maintenance"
|
||||
FrostFSNodeStoppedLocalNodesMaintenance = "stopped local node's maintenance"
|
||||
|
|
Loading…
Reference in a new issue