[] cmd/node: Serve gRPC on multiple interfaces

Generalize single gRPC interface of the storage node to a group of
interfaces. Each interface calls the same RPC handler.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-06-22 20:25:18 +03:00 committed by Leonard Lyubich
parent d1eb9c3b0f
commit 8060735732
13 changed files with 220 additions and 189 deletions

View file

@ -16,17 +16,19 @@ func initAccountingService(c *cfg) {
balanceMorphWrapper, err := wrapper.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
fatalOnErr(err)
accountingGRPC.RegisterAccountingServiceServer(c.cfgGRPC.server,
accountingTransportGRPC.New(
accountingService.NewSignService(
&c.key.PrivateKey,
accountingService.NewResponseService(
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc,
server := accountingTransportGRPC.New(
accountingService.NewSignService(
&c.key.PrivateKey,
accountingService.NewResponseService(
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
accountingGRPC.RegisterAccountingServiceServer(srv, server)
}
}

View file

@ -17,7 +17,6 @@ import (
contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
@ -112,17 +111,13 @@ type cfg struct {
}
type cfgGRPC struct {
listener net.Listener
listeners []net.Listener
server *grpc.Server
servers []*grpc.Server
maxChunkSize uint64
maxAddrAmount uint64
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
}
type cfgMorph struct {
@ -225,24 +220,6 @@ func initCfg(path string) *cfg {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
var (
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
tlsConfig = grpcconfig.TLS(appCfg)
)
if tlsConfig.Enabled() {
tlsEnabled = true
tlsCertFile = tlsConfig.CertificateFile()
tlsKeyFile = tlsConfig.KeyFile()
}
if tlsEnabled {
netAddr.AddTLS()
}
state := newNetworkState()
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
@ -281,9 +258,6 @@ func initCfg(path string) *cfg {
cfgGRPC: cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
tlsEnabled: tlsEnabled,
tlsCertFile: tlsCertFile,
tlsKeyFile: tlsKeyFile,
},
localAddr: network.GroupFromAddress(netAddr),
respSvc: response.NewService(

View file

@ -2,33 +2,28 @@ package grpcconfig
import (
"errors"
"strconv"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
)
const (
subsection = "grpc"
tlsSubsection = "tls"
)
var (
errEndpointNotSet = errors.New("empty/not set endpoint, see `grpc.endpoint` section")
errTLSKeyNotSet = errors.New("empty/not set TLS key file path, see `grpc.tls.key` section")
errTLSCertNotSet = errors.New("empty/not set TLS certificate file path, see `grpc.tls.certificate` section")
)
// TLSConfig is a wrapper over "tls" config section which provides access
// to TLS configuration of gRPC connection.
type TLSConfig struct {
cfg *config.Config
}
// Config is a wrapper over the config section
// which provides access to gRPC server configurations.
type Config config.Config
// Endpoint returns value of "endpoint" config parameter
// from "grpc" section.
// Endpoint returns value of "endpoint" config parameter.
//
// Panics if value is not a non-empty string.
func Endpoint(c *config.Config) string {
v := config.StringSafe(c.Sub(subsection), "endpoint")
func (x *Config) Endpoint() string {
v := config.StringSafe(
(*config.Config)(x),
"endpoint")
if v == "" {
panic(errEndpointNotSet)
}
@ -36,19 +31,27 @@ func Endpoint(c *config.Config) string {
return v
}
// TLS returns structure that provides access to "tls" subsection of
// "grpc" section.
func TLS(c *config.Config) TLSConfig {
return TLSConfig{
cfg: c.Sub(subsection).Sub(tlsSubsection),
// TLS returns "tls" subsection as a TLSConfig.
//
// Returns nil if "enabled" value of "tls" subsection is false.
func (x *Config) TLS() *TLSConfig {
sub := (*config.Config)(x).
Sub("tls")
if !config.BoolSafe(sub, "enabled") {
return nil
}
return &TLSConfig{
cfg: sub,
}
}
// Enabled returns value of "enabled" config parameter.
//
// Returns false if value is not set.
func (tls TLSConfig) Enabled() bool {
return config.BoolSafe(tls.cfg, "enabled")
// TLSConfig is a wrapper over the config section
// which provides access to TLS configurations
// of the gRPC server.
type TLSConfig struct {
cfg *config.Config
}
// KeyFile returns value of "key" config parameter.
@ -74,3 +77,24 @@ func (tls TLSConfig) CertificateFile() string {
return v
}
// IterateEndpoints iterates over subsections ["0":"N") (N - "num" value)
// of "grpc" section of c, wrap them into Config and passes to f.
//
// Panics if N is not a positive number.
func IterateEndpoints(c *config.Config, f func(*Config)) {
c = c.Sub("grpc")
num := config.Uint(c, "num")
if num == 0 {
panic("no gRPC server configured")
}
for i := uint64(0); i < num; i++ {
si := strconv.FormatUint(i, 10)
sc := (*Config)(c.Sub(si))
f(sc)
}
}

View file

@ -10,49 +10,35 @@ import (
func TestGRPCSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
tlsEnabled := TLS(empty).Enabled()
require.Equal(t, false, tlsEnabled)
require.PanicsWithError(
t,
errEndpointNotSet.Error(),
func() {
Endpoint(empty)
},
)
require.PanicsWithError(
t,
errTLSKeyNotSet.Error(),
func() {
TLS(empty).KeyFile()
},
)
require.PanicsWithError(
t,
errTLSCertNotSet.Error(),
func() {
TLS(empty).CertificateFile()
},
)
require.Panics(t, func() {
IterateEndpoints(configtest.EmptyConfig(), nil)
})
})
const path = "../../../../config/example/node"
var fileConfigTest = func(c *config.Config) {
addr := Endpoint(c)
tlsEnabled := TLS(c).Enabled()
tlsCert := TLS(c).CertificateFile()
tlsKey := TLS(c).KeyFile()
num := 0
require.Equal(t, "s01.neofs.devenv:8080", addr)
require.Equal(t, true, tlsEnabled)
require.Equal(t, "/path/to/cert", tlsCert)
require.Equal(t, "/path/to/key", tlsKey)
IterateEndpoints(c, func(sc *Config) {
defer func() {
num++
}()
tls := sc.TLS()
switch num {
case 0:
require.Equal(t, "s01.neofs.devenv:8080", sc.Endpoint())
require.Equal(t, "/path/to/cert", tls.CertificateFile())
require.Equal(t, "/path/to/key", tls.KeyFile())
case 1:
require.Equal(t, "s02.neofs.devenv:8080", sc.Endpoint())
require.Nil(t, tls)
}
})
}
configtest.ForEachFileType(path, fileConfigTest)

View file

@ -114,23 +114,25 @@ func initContainerService(c *cfg) {
})
})
containerGRPC.RegisterContainerServiceServer(c.cfgGRPC.server,
containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(wrap)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
}
}
// addContainerNotificationHandler adds handler that will be executed synchronously

View file

@ -12,41 +12,52 @@ import (
)
func initGRPC(c *cfg) {
var err error
grpcconfig.IterateEndpoints(c.appCfg, func(sc *grpcconfig.Config) {
lis, err := net.Listen("tcp", sc.Endpoint())
fatalOnErr(err)
c.cfgGRPC.listener, err = net.Listen("tcp", grpcconfig.Endpoint(c.appCfg))
fatalOnErr(err)
c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis)
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if c.cfgGRPC.tlsEnabled {
creds, err := credentials.NewServerTLSFromFile(c.cfgGRPC.tlsCertFile, c.cfgGRPC.tlsKeyFile)
fatalOnErrDetails("could not read credentials from file", err)
tlsCfg := sc.TLS()
serverOpts = append(serverOpts, grpc.Creds(creds))
}
if tlsCfg != nil {
creds, err := credentials.NewServerTLSFromFile(tlsCfg.CertificateFile(), tlsCfg.KeyFile())
fatalOnErrDetails("could not read credentials from file", err)
c.cfgGRPC.server = grpc.NewServer(serverOpts...)
serverOpts = append(serverOpts, grpc.Creds(creds))
}
c.onShutdown(func() {
stopGRPC("NeoFS Public API", c.cfgGRPC.server, c.log)
srv := grpc.NewServer(serverOpts...)
c.onShutdown(func() {
stopGRPC("NeoFS Public API", srv, c.log)
})
c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
})
}
func serveGRPC(c *cfg) {
c.wg.Add(1)
for i := range c.cfgGRPC.servers {
c.wg.Add(1)
go func() {
defer func() {
c.wg.Done()
srv := c.cfgGRPC.servers[i]
lis := c.cfgGRPC.listeners[i]
go func() {
defer func() {
c.wg.Done()
}()
if err := srv.Serve(lis); err != nil {
fmt.Println("gRPC server error", err)
}
}()
if err := c.cfgGRPC.server.Serve(c.cfgGRPC.listener); err != nil {
fmt.Println("gRPC server error", err)
}
}()
}
}
func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {

View file

@ -86,25 +86,27 @@ func initNetmapService(c *cfg) {
initMorphComponents(c)
}
netmapGRPC.RegisterNetmapServiceServer(c.cfgGRPC.server,
netmapTransportGRPC.New(
netmapService.NewSignService(
&c.key.PrivateKey,
netmapService.NewResponseService(
netmapService.NewExecutionService(
c,
c.apiVersion,
&netInfo{
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
},
),
c.respSvc,
server := netmapTransportGRPC.New(
netmapService.NewSignService(
&c.key.PrivateKey,
netmapService.NewResponseService(
netmapService.NewExecutionService(
c,
c.apiVersion,
&netInfo{
netState: c.cfgNetmap.state,
magic: c.cfgMorph.client,
},
),
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
netmapGRPC.RegisterNetmapServiceServer(srv, server)
}
addNewEpochNotificationHandler(c, func(ev event.Event) {
c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber())
})

View file

@ -371,9 +371,11 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector)
}
objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server,
objectTransportGRPC.New(firstSvc),
)
server := objectTransportGRPC.New(firstSvc)
for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
}
}
type morphEACLStorage struct {

View file

@ -186,24 +186,26 @@ func initReputationService(c *cfg) {
},
)
v2reputationgrpc.RegisterReputationServiceServer(c.cfgGRPC.server,
grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
server := grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
v2reputationgrpc.RegisterReputationServiceServer(srv, server)
}
// initialize eigen trust block timer
durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper)

View file

@ -10,17 +10,19 @@ import (
func initSessionService(c *cfg) {
c.privateTokenStore = storage.New()
sessionGRPC.RegisterSessionServiceServer(c.cfgGRPC.server,
sessionTransportGRPC.New(
sessionSvc.NewSignService(
&c.key.PrivateKey,
sessionSvc.NewResponseService(
sessionSvc.NewExecutionService(
c.privateTokenStore,
),
c.respSvc,
server := sessionTransportGRPC.New(
sessionSvc.NewSignService(
&c.key.PrivateKey,
sessionSvc.NewResponseService(
sessionSvc.NewExecutionService(
c.privateTokenStore,
),
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
sessionGRPC.RegisterSessionServiceServer(srv, server)
}
}

View file

@ -17,10 +17,18 @@ NEOFS_NODE_ATTRIBUTE_1=UN-LOCODE:RU MSK
NEOFS_NODE_RELAY=true
# gRPC section
NEOFS_GRPC_ENDPOINT=s01.neofs.devenv:8080
NEOFS_GRPC_TLS_ENABLED=true
NEOFS_GRPC_TLS_CERTIFICATE=/path/to/cert
NEOFS_GRPC_TLS_KEY=/path/to/key
NEOFS_GRPC_NUM=2
## 0 server
NEOFS_GRPC_0_ENDPOINT=s01.neofs.devenv:8080
### TLS config
NEOFS_GRPC_0_TLS_ENABLED=true
NEOFS_GRPC_0_TLS_CERTIFICATE=/path/to/cert
NEOFS_GRPC_0_TLS_KEY=/path/to/key
## 1 server
NEOFS_GRPC_1_ENDPOINT=s02.neofs.devenv:8080
### TLS config
NEOFS_GRPC_1_TLS_ENABLED=false
# Control service section
NEOFS_CONTROL_AUTHORIZED_KEYS=035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6

View file

@ -23,11 +23,20 @@
"relay": true
},
"grpc": {
"endpoint": "s01.neofs.devenv:8080",
"tls": {
"enabled": true,
"certificate": "/path/to/cert",
"key": "/path/to/key"
"num": 2,
"0": {
"endpoint": "s01.neofs.devenv:8080",
"tls": {
"enabled": true,
"certificate": "/path/to/cert",
"key": "/path/to/key"
}
},
"1": {
"endpoint": "s02.neofs.devenv:8080",
"tls": {
"enabled": false
}
}
},
"control": {

View file

@ -21,11 +21,18 @@ node:
relay: true
grpc:
endpoint: s01.neofs.devenv:8080
tls:
enabled: true
certificate: /path/to/cert
key: /path/to/key
num: 2
0:
endpoint: s01.neofs.devenv:8080
tls:
enabled: true
certificate: /path/to/cert
key: /path/to/key
1:
endpoint: s02.neofs.devenv:8080
tls:
enabled: false
control:
authorized_keys: