forked from TrueCloudLab/frostfs-node
[#306] Rename Private service to Control service
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
df3746fa68
commit
abd9952e46
16 changed files with 172 additions and 172 deletions
|
@ -28,7 +28,7 @@ import (
|
|||
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/private"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -182,7 +182,7 @@ type cfg struct {
|
|||
|
||||
respSvc *response.Service
|
||||
|
||||
cfgPrivateService cfgPrivateService
|
||||
cfgControlService cfgControlService
|
||||
|
||||
healthStatus *atomic.Int32
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ type cfgObjectRoutines struct {
|
|||
get, head, put, search, rng, rngHash *ants.Pool
|
||||
}
|
||||
|
||||
type cfgPrivateService struct {
|
||||
type cfgControlService struct {
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
|
@ -342,7 +342,7 @@ func initCfg(path string) *cfg {
|
|||
cfgObject: cfgObject{
|
||||
pool: initObjectPool(viperCfg),
|
||||
},
|
||||
healthStatus: atomic.NewInt32(int32(private.HealthStatus_STATUS_UNDEFINED)),
|
||||
healthStatus: atomic.NewInt32(int32(control.HealthStatus_STATUS_UNDEFINED)),
|
||||
}
|
||||
|
||||
initLocalStorage(c)
|
||||
|
@ -424,7 +424,7 @@ func defaultConfiguration(v *viper.Viper) {
|
|||
v.SetDefault(cfgObjectRangePoolSize, 10)
|
||||
v.SetDefault(cfgObjectRangeHashPoolSize, 10)
|
||||
|
||||
v.SetDefault(cfgPrivateSvcAllowedKeys, []string{})
|
||||
v.SetDefault(cfgCtrlSvcAllowedKeys, []string{})
|
||||
}
|
||||
|
||||
func (c *cfg) LocalAddress() *network.Address {
|
||||
|
|
76
cmd/neofs-node/control.go
Normal file
76
cmd/neofs-node/control.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"net"
|
||||
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
cfgCtrlSvcSection = "control"
|
||||
|
||||
cfgCtrlSvcAllowedKeys = cfgCtrlSvcSection + ".permitted_keys"
|
||||
|
||||
cfgCtrlSvcGRPCSection = cfgCtrlSvcSection + ".grpc"
|
||||
cfgCtrlGRPCEndpoint = cfgCtrlSvcGRPCSection + ".endpoint"
|
||||
)
|
||||
|
||||
func initControlService(c *cfg) {
|
||||
strKeys := c.viper.GetStringSlice(cfgCtrlSvcAllowedKeys)
|
||||
keys := make([][]byte, 0, len(strKeys)+1) // +1 for node key
|
||||
|
||||
keys = append(keys, crypto.MarshalPublicKey(&c.key.PublicKey))
|
||||
|
||||
for i := range strKeys {
|
||||
key, err := hex.DecodeString(strKeys[i])
|
||||
fatalOnErr(err)
|
||||
|
||||
if crypto.UnmarshalPublicKey(key) == nil {
|
||||
fatalOnErr(errors.Errorf("invalid permitted key for Control service %s", strKeys[i]))
|
||||
}
|
||||
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
ctlSvc := controlSvc.New(
|
||||
controlSvc.WithKey(c.key),
|
||||
controlSvc.WithAllowedKeys(keys),
|
||||
controlSvc.WithHealthChecker(c),
|
||||
)
|
||||
|
||||
var (
|
||||
err error
|
||||
lis net.Listener
|
||||
endpoint = c.viper.GetString(cfgCtrlGRPCEndpoint)
|
||||
)
|
||||
|
||||
if endpoint == "" || endpoint == c.viper.GetString(cfgListenAddress) {
|
||||
lis = c.cfgGRPC.listener
|
||||
c.cfgControlService.server = c.cfgGRPC.server
|
||||
} else {
|
||||
lis, err = net.Listen("tcp", endpoint)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.cfgControlService.server = grpc.NewServer()
|
||||
}
|
||||
|
||||
control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
fatalOnErr(c.cfgControlService.server.Serve(lis))
|
||||
}))
|
||||
}
|
||||
|
||||
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
||||
c.healthStatus.Store(int32(st))
|
||||
}
|
||||
|
||||
func (c *cfg) HealthStatus() control.HealthStatus {
|
||||
return control.HealthStatus(c.healthStatus.Load())
|
||||
}
|
|
@ -5,7 +5,7 @@ import (
|
|||
"flag"
|
||||
"log"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/private"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -42,7 +42,7 @@ func initApp(c *cfg) {
|
|||
initSessionService(c)
|
||||
initObjectService(c)
|
||||
initProfiler(c)
|
||||
initPrivateService(c)
|
||||
initControlService(c)
|
||||
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init())
|
||||
|
@ -56,7 +56,7 @@ func bootUp(c *cfg) {
|
|||
bootstrapNode(c)
|
||||
startWorkers(c)
|
||||
|
||||
c.setHealthStatus(private.HealthStatus_ONLINE)
|
||||
c.setHealthStatus(control.HealthStatus_ONLINE)
|
||||
}
|
||||
|
||||
func wait(c *cfg) {
|
||||
|
@ -75,7 +75,7 @@ func wait(c *cfg) {
|
|||
|
||||
func shutdown(c *cfg) {
|
||||
c.cfgGRPC.server.GracefulStop()
|
||||
c.cfgPrivateService.server.GracefulStop()
|
||||
c.cfgControlService.server.GracefulStop()
|
||||
|
||||
c.log.Info("gRPC server stopped")
|
||||
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
||||
netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/private"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
@ -117,7 +117,7 @@ func addNewEpochNotificationHandler(c *cfg, h event.Handler) {
|
|||
}
|
||||
|
||||
func goOffline(c *cfg) {
|
||||
c.setHealthStatus(private.HealthStatus_OFFLINE)
|
||||
c.setHealthStatus(control.HealthStatus_OFFLINE)
|
||||
|
||||
err := c.cfgNetmap.wrapper.UpdatePeerState(
|
||||
crypto.MarshalPublicKey(&c.key.PublicKey),
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"net"
|
||||
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/private"
|
||||
privateSvc "github.com/nspcc-dev/neofs-node/pkg/services/private/server"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
cfgPrivateSvcSection = "private"
|
||||
|
||||
cfgPrivateSvcAllowedKeys = cfgPrivateSvcSection + ".permitted_keys"
|
||||
|
||||
cfgPrivateSvcGRPCSection = cfgPrivateSvcSection + ".grpc"
|
||||
cfgPrivateGRPCEndpoint = cfgPrivateSvcGRPCSection + ".endpoint"
|
||||
)
|
||||
|
||||
func initPrivateService(c *cfg) {
|
||||
strKeys := c.viper.GetStringSlice(cfgPrivateSvcAllowedKeys)
|
||||
keys := make([][]byte, 0, len(strKeys)+1) // +1 for node key
|
||||
|
||||
keys = append(keys, crypto.MarshalPublicKey(&c.key.PublicKey))
|
||||
|
||||
for i := range strKeys {
|
||||
key, err := hex.DecodeString(strKeys[i])
|
||||
fatalOnErr(err)
|
||||
|
||||
if crypto.UnmarshalPublicKey(key) == nil {
|
||||
fatalOnErr(errors.Errorf("invalid permitted key for private service %s", strKeys[i]))
|
||||
}
|
||||
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
privSvc := privateSvc.New(
|
||||
privateSvc.WithKey(c.key),
|
||||
privateSvc.WithAllowedKeys(keys),
|
||||
privateSvc.WithHealthChecker(c),
|
||||
)
|
||||
|
||||
var (
|
||||
err error
|
||||
lis net.Listener
|
||||
endpoint = c.viper.GetString(cfgPrivateGRPCEndpoint)
|
||||
)
|
||||
|
||||
if endpoint == "" || endpoint == c.viper.GetString(cfgListenAddress) {
|
||||
lis = c.cfgGRPC.listener
|
||||
c.cfgPrivateService.server = c.cfgGRPC.server
|
||||
} else {
|
||||
lis, err = net.Listen("tcp", endpoint)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.cfgPrivateService.server = grpc.NewServer()
|
||||
}
|
||||
|
||||
private.RegisterPrivateServiceServer(c.cfgPrivateService.server, privSvc)
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
fatalOnErr(c.cfgPrivateService.server.Serve(lis))
|
||||
}))
|
||||
}
|
||||
|
||||
func (c *cfg) setHealthStatus(st private.HealthStatus) {
|
||||
c.healthStatus.Store(int32(st))
|
||||
}
|
||||
|
||||
func (c *cfg) HealthStatus() private.HealthStatus {
|
||||
return private.HealthStatus(c.healthStatus.Load())
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue