From 806073573260c40ae45c5cad09abeeee1fbac320 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 22 Jun 2021 20:25:18 +0300 Subject: [PATCH] [#607] cmd/node: Serve gRPC on multiple interfaces Generalize single gRPC interface of the storage node to a group of interfaces. Each interface calls the same RPC handler. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/accounting.go | 20 ++++--- cmd/neofs-node/config.go | 30 +--------- cmd/neofs-node/config/grpc/config.go | 72 +++++++++++++++-------- cmd/neofs-node/config/grpc/config_test.go | 60 ++++++++----------- cmd/neofs-node/container.go | 30 +++++----- cmd/neofs-node/grpc.go | 57 ++++++++++-------- cmd/neofs-node/netmap.go | 30 +++++----- cmd/neofs-node/object.go | 8 ++- cmd/neofs-node/reputation.go | 30 +++++----- cmd/neofs-node/session.go | 20 ++++--- config/example/node.env | 16 +++-- config/example/node.json | 19 ++++-- config/example/node.yaml | 17 ++++-- 13 files changed, 220 insertions(+), 189 deletions(-) diff --git a/cmd/neofs-node/accounting.go b/cmd/neofs-node/accounting.go index e0491709..ec2289fc 100644 --- a/cmd/neofs-node/accounting.go +++ b/cmd/neofs-node/accounting.go @@ -16,17 +16,19 @@ func initAccountingService(c *cfg) { balanceMorphWrapper, err := wrapper.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0) fatalOnErr(err) - accountingGRPC.RegisterAccountingServiceServer(c.cfgGRPC.server, - accountingTransportGRPC.New( - accountingService.NewSignService( - &c.key.PrivateKey, - accountingService.NewResponseService( - accountingService.NewExecutionService( - accounting.NewExecutor(balanceMorphWrapper), - ), - c.respSvc, + server := accountingTransportGRPC.New( + accountingService.NewSignService( + &c.key.PrivateKey, + accountingService.NewResponseService( + accountingService.NewExecutionService( + accounting.NewExecutor(balanceMorphWrapper), ), + c.respSvc, ), ), ) + + for _, srv := range c.cfgGRPC.servers { + accountingGRPC.RegisterAccountingServiceServer(srv, server) + } } diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index b77a0272..74716fb3 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -17,7 +17,6 @@ import ( contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts" engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" - grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" @@ -112,17 +111,13 @@ type cfg struct { } type cfgGRPC struct { - listener net.Listener + listeners []net.Listener - server *grpc.Server + servers []*grpc.Server maxChunkSize uint64 maxAddrAmount uint64 - - tlsEnabled bool - tlsCertFile string - tlsKeyFile string } type cfgMorph struct { @@ -225,24 +220,6 @@ func initCfg(path string) *cfg { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes - var ( - tlsEnabled bool - tlsCertFile string - tlsKeyFile string - - tlsConfig = grpcconfig.TLS(appCfg) - ) - - if tlsConfig.Enabled() { - tlsEnabled = true - tlsCertFile = tlsConfig.CertificateFile() - tlsKeyFile = tlsConfig.KeyFile() - } - - if tlsEnabled { - netAddr.AddTLS() - } - state := newNetworkState() containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) @@ -281,9 +258,6 @@ func initCfg(path string) *cfg { cfgGRPC: cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, - tlsEnabled: tlsEnabled, - tlsCertFile: tlsCertFile, - tlsKeyFile: tlsKeyFile, }, localAddr: network.GroupFromAddress(netAddr), respSvc: response.NewService( diff --git a/cmd/neofs-node/config/grpc/config.go b/cmd/neofs-node/config/grpc/config.go index fd358605..6029b565 100644 --- a/cmd/neofs-node/config/grpc/config.go +++ b/cmd/neofs-node/config/grpc/config.go @@ -2,33 +2,28 @@ package grpcconfig import ( "errors" + "strconv" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" ) -const ( - subsection = "grpc" - tlsSubsection = "tls" -) - var ( errEndpointNotSet = errors.New("empty/not set endpoint, see `grpc.endpoint` section") errTLSKeyNotSet = errors.New("empty/not set TLS key file path, see `grpc.tls.key` section") errTLSCertNotSet = errors.New("empty/not set TLS certificate file path, see `grpc.tls.certificate` section") ) -// TLSConfig is a wrapper over "tls" config section which provides access -// to TLS configuration of gRPC connection. -type TLSConfig struct { - cfg *config.Config -} +// Config is a wrapper over the config section +// which provides access to gRPC server configurations. +type Config config.Config -// Endpoint returns value of "endpoint" config parameter -// from "grpc" section. +// Endpoint returns value of "endpoint" config parameter. // // Panics if value is not a non-empty string. -func Endpoint(c *config.Config) string { - v := config.StringSafe(c.Sub(subsection), "endpoint") +func (x *Config) Endpoint() string { + v := config.StringSafe( + (*config.Config)(x), + "endpoint") if v == "" { panic(errEndpointNotSet) } @@ -36,19 +31,27 @@ func Endpoint(c *config.Config) string { return v } -// TLS returns structure that provides access to "tls" subsection of -// "grpc" section. -func TLS(c *config.Config) TLSConfig { - return TLSConfig{ - cfg: c.Sub(subsection).Sub(tlsSubsection), +// TLS returns "tls" subsection as a TLSConfig. +// +// Returns nil if "enabled" value of "tls" subsection is false. +func (x *Config) TLS() *TLSConfig { + sub := (*config.Config)(x). + Sub("tls") + + if !config.BoolSafe(sub, "enabled") { + return nil + } + + return &TLSConfig{ + cfg: sub, } } -// Enabled returns value of "enabled" config parameter. -// -// Returns false if value is not set. -func (tls TLSConfig) Enabled() bool { - return config.BoolSafe(tls.cfg, "enabled") +// TLSConfig is a wrapper over the config section +// which provides access to TLS configurations +// of the gRPC server. +type TLSConfig struct { + cfg *config.Config } // KeyFile returns value of "key" config parameter. @@ -74,3 +77,24 @@ func (tls TLSConfig) CertificateFile() string { return v } + +// IterateEndpoints iterates over subsections ["0":"N") (N - "num" value) +// of "grpc" section of c, wrap them into Config and passes to f. +// +// Panics if N is not a positive number. +func IterateEndpoints(c *config.Config, f func(*Config)) { + c = c.Sub("grpc") + + num := config.Uint(c, "num") + if num == 0 { + panic("no gRPC server configured") + } + + for i := uint64(0); i < num; i++ { + si := strconv.FormatUint(i, 10) + + sc := (*Config)(c.Sub(si)) + + f(sc) + } +} diff --git a/cmd/neofs-node/config/grpc/config_test.go b/cmd/neofs-node/config/grpc/config_test.go index 5c6d5014..2247c4c7 100644 --- a/cmd/neofs-node/config/grpc/config_test.go +++ b/cmd/neofs-node/config/grpc/config_test.go @@ -10,49 +10,35 @@ import ( func TestGRPCSection(t *testing.T) { t.Run("defaults", func(t *testing.T) { - empty := configtest.EmptyConfig() - - tlsEnabled := TLS(empty).Enabled() - - require.Equal(t, false, tlsEnabled) - - require.PanicsWithError( - t, - errEndpointNotSet.Error(), - func() { - Endpoint(empty) - }, - ) - - require.PanicsWithError( - t, - errTLSKeyNotSet.Error(), - func() { - TLS(empty).KeyFile() - }, - ) - - require.PanicsWithError( - t, - errTLSCertNotSet.Error(), - func() { - TLS(empty).CertificateFile() - }, - ) + require.Panics(t, func() { + IterateEndpoints(configtest.EmptyConfig(), nil) + }) }) const path = "../../../../config/example/node" var fileConfigTest = func(c *config.Config) { - addr := Endpoint(c) - tlsEnabled := TLS(c).Enabled() - tlsCert := TLS(c).CertificateFile() - tlsKey := TLS(c).KeyFile() + num := 0 - require.Equal(t, "s01.neofs.devenv:8080", addr) - require.Equal(t, true, tlsEnabled) - require.Equal(t, "/path/to/cert", tlsCert) - require.Equal(t, "/path/to/key", tlsKey) + IterateEndpoints(c, func(sc *Config) { + defer func() { + num++ + }() + + tls := sc.TLS() + + switch num { + case 0: + require.Equal(t, "s01.neofs.devenv:8080", sc.Endpoint()) + + require.Equal(t, "/path/to/cert", tls.CertificateFile()) + require.Equal(t, "/path/to/key", tls.KeyFile()) + case 1: + require.Equal(t, "s02.neofs.devenv:8080", sc.Endpoint()) + + require.Nil(t, tls) + } + }) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 44034ab8..63f0cc14 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -114,23 +114,25 @@ func initContainerService(c *cfg) { }) }) - containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server, - containerTransportGRPC.New( - containerService.NewSignService( - &c.key.PrivateKey, - containerService.NewResponseService( - &usedSpaceService{ - Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap)), - loadWriterProvider: loadRouter, - loadPlacementBuilder: loadPlacementBuilder, - routeBuilder: routeBuilder, - cfg: c, - }, - c.respSvc, - ), + server := containerTransportGRPC.New( + containerService.NewSignService( + &c.key.PrivateKey, + containerService.NewResponseService( + &usedSpaceService{ + Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap)), + loadWriterProvider: loadRouter, + loadPlacementBuilder: loadPlacementBuilder, + routeBuilder: routeBuilder, + cfg: c, + }, + c.respSvc, ), ), ) + + for _, srv := range c.cfgGRPC.servers { + containerGRPC.RegisterContainerServiceServer(srv, server) + } } // addContainerNotificationHandler adds handler that will be executed synchronously diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index ef9c39a0..a93ae222 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -12,41 +12,52 @@ import ( ) func initGRPC(c *cfg) { - var err error + grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) { + lis, err := net.Listen("tcp", sc.Endpoint()) + fatalOnErr(err) - c.cfgGRPC.listener, err = net.Listen("tcp", grpcconfig.Endpoint(c.appCfg)) - fatalOnErr(err) + c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) - serverOpts := []grpc.ServerOption{ - grpc.MaxSendMsgSize(maxMsgSize), - } + serverOpts := []grpc.ServerOption{ + grpc.MaxSendMsgSize(maxMsgSize), + } - if c.cfgGRPC.tlsEnabled { - creds, err := credentials.NewServerTLSFromFile(c.cfgGRPC.tlsCertFile, c.cfgGRPC.tlsKeyFile) - fatalOnErrDetails("could not read credentials from file", err) + tlsCfg := sc.TLS() - serverOpts = append(serverOpts, grpc.Creds(creds)) - } + if tlsCfg != nil { + creds, err := credentials.NewServerTLSFromFile(tlsCfg.CertificateFile(), tlsCfg.KeyFile()) + fatalOnErrDetails("could not read credentials from file", err) - c.cfgGRPC.server = grpc.NewServer(serverOpts...) + serverOpts = append(serverOpts, grpc.Creds(creds)) + } - c.onShutdown(func() { - stopGRPC("NeoFS Public API", c.cfgGRPC.server, c.log) + srv := grpc.NewServer(serverOpts...) + + c.onShutdown(func() { + stopGRPC("NeoFS Public API", srv, c.log) + }) + + c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) }) } func serveGRPC(c *cfg) { - c.wg.Add(1) + for i := range c.cfgGRPC.servers { + c.wg.Add(1) - go func() { - defer func() { - c.wg.Done() + srv := c.cfgGRPC.servers[i] + lis := c.cfgGRPC.listeners[i] + + go func() { + defer func() { + c.wg.Done() + }() + + if err := srv.Serve(lis); err != nil { + fmt.Println("gRPC server error", err) + } }() - - if err := c.cfgGRPC.server.Serve(c.cfgGRPC.listener); err != nil { - fmt.Println("gRPC server error", err) - } - }() + } } func stopGRPC(name string, s *grpc.Server, l *logger.Logger) { diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index fca68409..6d6106cb 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -86,25 +86,27 @@ func initNetmapService(c *cfg) { initMorphComponents(c) } - netmapGRPC.RegisterNetmapServiceServer(c.cfgGRPC.server, - netmapTransportGRPC.New( - netmapService.NewSignService( - &c.key.PrivateKey, - netmapService.NewResponseService( - netmapService.NewExecutionService( - c, - c.apiVersion, - &netInfo{ - netState: c.cfgNetmap.state, - magic: c.cfgMorph.client, - }, - ), - c.respSvc, + server := netmapTransportGRPC.New( + netmapService.NewSignService( + &c.key.PrivateKey, + netmapService.NewResponseService( + netmapService.NewExecutionService( + c, + c.apiVersion, + &netInfo{ + netState: c.cfgNetmap.state, + magic: c.cfgMorph.client, + }, ), + c.respSvc, ), ), ) + for _, srv := range c.cfgGRPC.servers { + netmapGRPC.RegisterNetmapServiceServer(srv, server) + } + addNewEpochNotificationHandler(c, func(ev event.Event) { c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) }) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 69ce9058..c9c9a442 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -371,9 +371,11 @@ func initObjectService(c *cfg) { firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector) } - objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, - objectTransportGRPC.New(firstSvc), - ) + server := objectTransportGRPC.New(firstSvc) + + for _, srv := range c.cfgGRPC.servers { + objectGRPC.RegisterObjectServiceServer(srv, server) + } } type morphEACLStorage struct { diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index b4e02ebe..b6f3784f 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -186,24 +186,26 @@ func initReputationService(c *cfg) { }, ) - v2reputationgrpc.RegisterReputationServiceServer(c.cfgGRPC.server, - grpcreputation.New( - reputationrpc.NewSignService( - &c.key.PrivateKey, - reputationrpc.NewResponseService( - &reputationServer{ - cfg: c, - log: c.log, - localRouter: localTrustRouter, - intermediateRouter: intermediateTrustRouter, - routeBuilder: localRouteBuilder, - }, - c.respSvc, - ), + server := grpcreputation.New( + reputationrpc.NewSignService( + &c.key.PrivateKey, + reputationrpc.NewResponseService( + &reputationServer{ + cfg: c, + log: c.log, + localRouter: localTrustRouter, + intermediateRouter: intermediateTrustRouter, + routeBuilder: localRouteBuilder, + }, + c.respSvc, ), ), ) + for _, srv := range c.cfgGRPC.servers { + v2reputationgrpc.RegisterReputationServiceServer(srv, server) + } + // initialize eigen trust block timer durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper) diff --git a/cmd/neofs-node/session.go b/cmd/neofs-node/session.go index 654236af..dfda3e5c 100644 --- a/cmd/neofs-node/session.go +++ b/cmd/neofs-node/session.go @@ -10,17 +10,19 @@ import ( func initSessionService(c *cfg) { c.privateTokenStore = storage.New() - sessionGRPC.RegisterSessionServiceServer(c.cfgGRPC.server, - sessionTransportGRPC.New( - sessionSvc.NewSignService( - &c.key.PrivateKey, - sessionSvc.NewResponseService( - sessionSvc.NewExecutionService( - c.privateTokenStore, - ), - c.respSvc, + server := sessionTransportGRPC.New( + sessionSvc.NewSignService( + &c.key.PrivateKey, + sessionSvc.NewResponseService( + sessionSvc.NewExecutionService( + c.privateTokenStore, ), + c.respSvc, ), ), ) + + for _, srv := range c.cfgGRPC.servers { + sessionGRPC.RegisterSessionServiceServer(srv, server) + } } diff --git a/config/example/node.env b/config/example/node.env index 9e492ffd..d979f5c5 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -17,10 +17,18 @@ NEOFS_NODE_ATTRIBUTE_1=UN-LOCODE:RU MSK NEOFS_NODE_RELAY=true # gRPC section -NEOFS_GRPC_ENDPOINT=s01.neofs.devenv:8080 -NEOFS_GRPC_TLS_ENABLED=true -NEOFS_GRPC_TLS_CERTIFICATE=/path/to/cert -NEOFS_GRPC_TLS_KEY=/path/to/key +NEOFS_GRPC_NUM=2 +## 0 server +NEOFS_GRPC_0_ENDPOINT=s01.neofs.devenv:8080 +### TLS config +NEOFS_GRPC_0_TLS_ENABLED=true +NEOFS_GRPC_0_TLS_CERTIFICATE=/path/to/cert +NEOFS_GRPC_0_TLS_KEY=/path/to/key + +## 1 server +NEOFS_GRPC_1_ENDPOINT=s02.neofs.devenv:8080 +### TLS config +NEOFS_GRPC_1_TLS_ENABLED=false # Control service section NEOFS_CONTROL_AUTHORIZED_KEYS=035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6 diff --git a/config/example/node.json b/config/example/node.json index c5fa0ee7..9b78d9d5 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -23,11 +23,20 @@ "relay": true }, "grpc": { - "endpoint": "s01.neofs.devenv:8080", - "tls": { - "enabled": true, - "certificate": "/path/to/cert", - "key": "/path/to/key" + "num": 2, + "0": { + "endpoint": "s01.neofs.devenv:8080", + "tls": { + "enabled": true, + "certificate": "/path/to/cert", + "key": "/path/to/key" + } + }, + "1": { + "endpoint": "s02.neofs.devenv:8080", + "tls": { + "enabled": false + } } }, "control": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 65e0a3d9..bc79e973 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -21,11 +21,18 @@ node: relay: true grpc: - endpoint: s01.neofs.devenv:8080 - tls: - enabled: true - certificate: /path/to/cert - key: /path/to/key + num: 2 + 0: + endpoint: s01.neofs.devenv:8080 + tls: + enabled: true + certificate: /path/to/cert + key: /path/to/key + + 1: + endpoint: s02.neofs.devenv:8080 + tls: + enabled: false control: authorized_keys: