forked from TrueCloudLab/frostfs-node
130 lines
3.8 KiB
Go
130 lines
3.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
|
|
controlconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/control"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const serviceNameControl = "control"
|
|
|
|
type treeSynchronizer struct {
|
|
treeSvc *tree.Service
|
|
}
|
|
|
|
func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error {
|
|
return t.treeSvc.SynchronizeTree(ctx, cnr, treeID)
|
|
}
|
|
|
|
func initControlService(c *cfg) {
|
|
endpoint := controlconfig.GRPC(c.appCfg).Endpoint()
|
|
if endpoint == controlconfig.GRPCEndpointDefault {
|
|
return
|
|
}
|
|
|
|
pubs := controlconfig.AuthorizedKeys(c.appCfg)
|
|
rawPubs := make([][]byte, 0, len(pubs)+1) // +1 for node key
|
|
|
|
rawPubs = append(rawPubs, c.key.PublicKey().Bytes())
|
|
|
|
for i := range pubs {
|
|
rawPubs = append(rawPubs, pubs[i].Bytes())
|
|
}
|
|
|
|
ctlSvc := controlSvc.New(
|
|
controlSvc.WithKey(&c.key.PrivateKey),
|
|
controlSvc.WithAuthorizedKeys(rawPubs),
|
|
controlSvc.WithHealthChecker(c),
|
|
controlSvc.WithNetMapSource(c.netMapSource),
|
|
controlSvc.WithContainerSource(c.cfgObject.cnrSource),
|
|
controlSvc.WithReplicator(c.replicator),
|
|
controlSvc.WithNodeState(c),
|
|
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
|
|
controlSvc.WithTreeService(treeSynchronizer{
|
|
c.treeService,
|
|
}),
|
|
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
|
|
)
|
|
|
|
lis, err := net.Listen("tcp", endpoint)
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeCantListenGRPCEndpointControl, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
c.cfgControlService.server = grpc.NewServer()
|
|
|
|
c.onShutdown(func() {
|
|
stopGRPC("FrostFS Control API", c.cfgControlService.server, c.log)
|
|
})
|
|
|
|
control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
|
|
|
|
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
|
runAndLog(ctx, c, serviceNameControl, false, func(context.Context, *cfg) {
|
|
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
|
|
zap.String("service", serviceNameControl),
|
|
zap.String("endpoint", endpoint))
|
|
fatalOnErr(c.cfgControlService.server.Serve(lis))
|
|
})
|
|
}))
|
|
}
|
|
|
|
func (c *cfg) NetmapStatus() control.NetmapStatus {
|
|
return c.cfgNetmap.state.controlNetmapStatus()
|
|
}
|
|
|
|
func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
|
c.notifySystemd(st)
|
|
c.healthStatus.Store(int32(st))
|
|
c.metricsCollector.State().SetHealth(int32(st))
|
|
}
|
|
|
|
func (c *cfg) compareAndSwapHealthStatus(oldSt, newSt control.HealthStatus) (swapped bool) {
|
|
if swapped = c.healthStatus.CompareAndSwap(int32(oldSt), int32(newSt)); swapped {
|
|
c.notifySystemd(newSt)
|
|
c.metricsCollector.State().SetHealth(int32(newSt))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (c *cfg) swapHealthStatus(st control.HealthStatus) (old control.HealthStatus) {
|
|
old = control.HealthStatus(c.healthStatus.Swap(int32(st)))
|
|
c.notifySystemd(st)
|
|
c.metricsCollector.State().SetHealth(int32(st))
|
|
return
|
|
}
|
|
|
|
func (c *cfg) HealthStatus() control.HealthStatus {
|
|
return control.HealthStatus(c.healthStatus.Load())
|
|
}
|
|
|
|
func (c *cfg) notifySystemd(st control.HealthStatus) {
|
|
if !c.sdNotify {
|
|
return
|
|
}
|
|
var err error
|
|
switch st {
|
|
case control.HealthStatus_READY:
|
|
err = sdnotify.FlagAndStatus(sdnotify.ReadyEnabled)
|
|
case control.HealthStatus_SHUTTING_DOWN:
|
|
err = sdnotify.FlagAndStatus(sdnotify.StoppingEnabled)
|
|
case control.HealthStatus_RECONFIGURING:
|
|
err = sdnotify.FlagAndStatus(sdnotify.ReloadingEnabled)
|
|
default:
|
|
err = sdnotify.Status(fmt.Sprintf("%v", st))
|
|
}
|
|
if err != nil {
|
|
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
|
|
}
|
|
}
|