frostfs-node/cmd/frostfs-node/reputation.go
Dmitrii Stepanov ed28ce24cd [#168] node: Refactor reputation service
Resolve funlen linter for initReputationService function

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-03-31 09:32:59 +03:00

386 lines
13 KiB
Go

package main
import (
"context"
"fmt"
v2reputation "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
v2reputationgrpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common"
intermediatereputation "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/intermediate"
localreputation "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/local"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/ticker"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
grpcreputation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/reputation/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
reputationrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common/router"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
eigentrustctrl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/controller"
intermediateroutes "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/routes"
consumerstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/consumers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/daughters"
localtrustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller"
localroutes "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/routes"
truststorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/storage"
reputationrpc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
func initReputationService(c *cfg) {
wrap, err := repClient.NewFromMorph(c.cfgMorph.client, c.cfgReputation.scriptHash, 0, repClient.TryNotary())
fatalOnErr(err)
localKey := c.key.PublicKey().Bytes()
nmSrc := c.netMapSource
// storing calculated trusts as a daughter
c.cfgReputation.localTrustStorage = truststorage.New(
truststorage.Prm{},
)
daughterStorage := daughters.New(daughters.Prm{})
consumerStorage := consumerstorage.New(consumerstorage.Prm{})
localTrustLogger := &logger.Logger{Logger: c.log.With(zap.String("trust_type", "local"))}
managerBuilder := reputationcommon.NewManagerBuilder(
reputationcommon.ManagersPrm{
NetMapSource: nmSrc,
},
reputationcommon.WithLogger(c.log),
)
localRouteBuilder := localroutes.New(
localroutes.Prm{
ManagerBuilder: managerBuilder,
Log: localTrustLogger,
},
)
localTrustRouter := createLocalTrustRouter(c, localRouteBuilder, localTrustLogger, daughterStorage)
intermediateTrustRouter := createIntermediateTrustRouter(c, consumerStorage, managerBuilder)
eigenTrustController := createEigenTrustController(c, intermediateTrustRouter, localKey, wrap, daughterStorage, consumerStorage)
c.cfgReputation.localTrustCtrl = createLocalTrustController(c, localTrustLogger, localKey, localTrustRouter)
addReputationReportHandler(c)
server := grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
v2reputationgrpc.RegisterReputationServiceServer(srv, server)
}
// initialize eigen trust block timer
newEigenTrustIterTimer(c)
addEigenTrustEpochHandler(c, eigenTrustController)
}
func addReputationReportHandler(c *cfg) {
addNewEpochAsyncNotificationHandler(
c,
func(ev event.Event) {
c.log.Debug("start reporting reputation on new epoch event")
var reportPrm localtrustcontroller.ReportPrm
// report collected values from previous epoch
reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1)
c.cfgReputation.localTrustCtrl.Report(reportPrm)
},
)
}
func addEigenTrustEpochHandler(c *cfg, eigenTrustController *eigentrustctrl.Controller) {
addNewEpochAsyncNotificationHandler(
c,
func(e event.Event) {
epoch := e.(netmap.NewEpoch).EpochNumber()
log := c.log.With(zap.Uint64("epoch", epoch))
duration, err := c.cfgNetmap.wrapper.EpochDuration()
if err != nil {
log.Debug("could not fetch epoch duration", zap.Error(err))
return
}
iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations()
if err != nil {
log.Debug("could not fetch iteration number", zap.Error(err))
return
}
epochTimer, err := ticker.NewIterationsTicker(duration, iterations, func() {
eigenTrustController.Continue(
eigentrustctrl.ContinuePrm{
Epoch: epoch - 1,
},
)
})
if err != nil {
log.Debug("could not create fixed epoch timer", zap.Error(err))
return
}
c.cfgMorph.eigenTrustTicker.addEpochTimer(epoch, epochTimer)
},
)
}
func createLocalTrustRouter(c *cfg, localRouteBuilder *localroutes.Builder, localTrustLogger *logger.Logger, daughterStorage *daughters.Storage) *reputationrouter.Router {
// storing received daughter(of current node) trusts as a manager
daughterStorageWriterProvider := &intermediatereputation.DaughterStorageWriterProvider{
Log: c.log,
Storage: daughterStorage,
}
remoteLocalTrustProvider := common.NewRemoteTrustProvider(
common.RemoteProviderPrm{
NetmapKeys: c,
DeadEndProvider: daughterStorageWriterProvider,
ClientCache: c.bgClientCache,
WriterProvider: localreputation.NewRemoteProvider(
localreputation.RemoteProviderPrm{
Key: &c.key.PrivateKey,
Log: localTrustLogger,
},
),
Log: localTrustLogger,
},
)
localTrustRouter := reputationrouter.New(
reputationrouter.Prm{
LocalServerInfo: c,
RemoteWriterProvider: remoteLocalTrustProvider,
Builder: localRouteBuilder,
},
reputationrouter.WithLogger(localTrustLogger))
return localTrustRouter
}
func createIntermediateTrustRouter(c *cfg, consumerStorage *consumerstorage.Storage, managerBuilder reputationcommon.ManagerBuilder) *reputationrouter.Router {
intermediateTrustLogger := &logger.Logger{Logger: c.log.With(zap.String("trust_type", "intermediate"))}
consumerStorageWriterProvider := &intermediatereputation.ConsumerStorageWriterProvider{
Log: c.log,
Storage: consumerStorage,
}
remoteIntermediateTrustProvider := common.NewRemoteTrustProvider(
common.RemoteProviderPrm{
NetmapKeys: c,
DeadEndProvider: consumerStorageWriterProvider,
ClientCache: c.bgClientCache,
WriterProvider: intermediatereputation.NewRemoteProvider(
intermediatereputation.RemoteProviderPrm{
Key: &c.key.PrivateKey,
Log: intermediateTrustLogger,
},
),
Log: intermediateTrustLogger,
},
)
intermediateRouteBuilder := intermediateroutes.New(
intermediateroutes.Prm{
ManagerBuilder: managerBuilder,
Log: intermediateTrustLogger,
},
)
intermediateTrustRouter := reputationrouter.New(
reputationrouter.Prm{
LocalServerInfo: c,
RemoteWriterProvider: remoteIntermediateTrustProvider,
Builder: intermediateRouteBuilder,
},
reputationrouter.WithLogger(intermediateTrustLogger),
)
return intermediateTrustRouter
}
func createEigenTrustController(c *cfg, intermediateTrustRouter *reputationrouter.Router, localKey []byte, wrap *repClient.Client,
daughterStorage *daughters.Storage, consumerStorage *consumerstorage.Storage) *eigentrustctrl.Controller {
eigenTrustCalculator := eigentrustcalc.New(
eigentrustcalc.Prm{
AlphaProvider: c.cfgNetmap.wrapper,
InitialTrustSource: intermediatereputation.InitialTrustSource{
NetMap: c.netMapSource,
},
IntermediateValueTarget: intermediateTrustRouter,
WorkerPool: c.cfgReputation.workerPool,
FinalResultTarget: intermediatereputation.NewFinalWriterProvider(
intermediatereputation.FinalWriterProviderPrm{
PrivatKey: &c.key.PrivateKey,
PubKey: localKey,
Client: wrap,
},
intermediatereputation.FinalWriterWithLogger(c.log),
),
DaughterTrustSource: &intermediatereputation.DaughterTrustIteratorProvider{
DaughterStorage: daughterStorage,
ConsumerStorage: consumerStorage,
},
},
eigentrustcalc.WithLogger(c.log),
)
eigenTrustController := eigentrustctrl.New(
eigentrustctrl.Prm{
DaughtersTrustCalculator: &intermediatereputation.DaughtersTrustCalculator{
Calculator: eigenTrustCalculator,
},
IterationsProvider: c.cfgNetmap.wrapper,
WorkerPool: c.cfgReputation.workerPool,
},
eigentrustctrl.WithLogger(c.log),
)
return eigenTrustController
}
func createLocalTrustController(c *cfg, localTrustLogger *logger.Logger, localKey []byte, localTrustRouter *reputationrouter.Router) *localtrustcontroller.Controller {
localTrustStorage := &localreputation.TrustStorage{
Log: localTrustLogger,
Storage: c.cfgReputation.localTrustStorage,
NmSrc: c.netMapSource,
LocalKey: localKey,
}
return localtrustcontroller.New(
localtrustcontroller.Prm{
LocalTrustSource: localTrustStorage,
LocalTrustTarget: localTrustRouter,
},
localtrustcontroller.WithLogger(c.log),
)
}
type reputationServer struct {
*cfg
log *logger.Logger
localRouter reputationcommon.WriterProvider
intermediateRouter reputationcommon.WriterProvider
routeBuilder reputationrouter.Builder
}
func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputation.AnnounceLocalTrustRequest) (*v2reputation.AnnounceLocalTrustResponse, error) {
passedRoute := reverseRoute(req.GetVerificationHeader())
passedRoute = append(passedRoute, s)
body := req.GetBody()
eCtx := &common.EpochContext{
Context: ctx,
E: body.GetEpoch(),
}
w, err := s.localRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize local trust writer: %w", err)
}
for _, trust := range body.GetTrusts() {
err = s.processLocalTrust(body.GetEpoch(), apiToLocalTrust(&trust, passedRoute[0].PublicKey()), passedRoute, w)
if err != nil {
return nil, fmt.Errorf("could not write one of local trusts: %w", err)
}
}
resp := new(v2reputation.AnnounceLocalTrustResponse)
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
return resp, nil
}
func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *v2reputation.AnnounceIntermediateResultRequest) (*v2reputation.AnnounceIntermediateResultResponse, error) {
passedRoute := reverseRoute(req.GetVerificationHeader())
passedRoute = append(passedRoute, s)
body := req.GetBody()
eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration())
w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eiCtx, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize trust writer: %w", err)
}
v2Trust := body.GetTrust()
trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetPublicKey())
err = w.Write(trust)
if err != nil {
return nil, fmt.Errorf("could not write trust: %w", err)
}
resp := new(v2reputation.AnnounceIntermediateResultResponse)
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
return resp, nil
}
func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust,
passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error {
err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute)
if err != nil {
return fmt.Errorf("wrong route of reputation trust value: %w", err)
}
return w.Write(t)
}
// apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer.
func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trust {
var trusted, trusting apireputation.PeerID
trusted.SetPublicKey(t.GetPeer().GetPublicKey())
trusting.SetPublicKey(trustingPeer)
localTrust := reputation.Trust{}
localTrust.SetValue(reputation.TrustValueFromFloat64(t.GetValue()))
localTrust.SetPeer(trusted)
localTrust.SetTrustingPeer(trusting)
return localTrust
}
func reverseRoute(hdr *session.RequestVerificationHeader) (passedRoute []reputationcommon.ServerInfo) {
for hdr != nil {
passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{
Key: hdr.GetBodySignature().GetKey(),
})
hdr = hdr.GetOrigin()
}
return
}