From 0d34d7c50814b36d8b38922230db39cc531126ff Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 29 Apr 2021 09:43:15 +0300 Subject: [PATCH] [#488] cmd/reputation: Collect all reputation wrappers Init all wrappers eigenTrust algorithm needed in `main` packages. Implement `SendIntermediateResult` GRPC method of `reputationServer`. Signed-off-by: Pavel Karpy --- cmd/neofs-node/reputation.go | 194 +++++++++++++++++++++++++++-------- 1 file changed, 151 insertions(+), 43 deletions(-) diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index fa805f51c..d44bba5c0 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -3,13 +3,18 @@ package main import ( "context" + "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation" v2reputationgrpc "github.com/nspcc-dev/neofs-api-go/v2/reputation/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/session" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/common" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" intermediatereputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + rptclient "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation" + rtpwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/network/cache" @@ -17,10 +22,13 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/reputation" reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" + eigentrustcalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator" + eigentrustctrl "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/controller" intermediateroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/routes" consumerstorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/consumers" "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters" - trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" + localtrustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" localroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/routes" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" reputationrpc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/rpc" @@ -29,29 +37,53 @@ import ( "go.uber.org/zap" ) +const EigenTrustAlpha = 0.5 + func initReputationService(c *cfg) { + staticClient, err := client.NewStatic( + c.cfgMorph.client, + c.cfgReputation.scriptHash, + fixedn.Fixed8(0), + ) + fatalOnErr(err) + + rptClient, err := rptclient.New(staticClient) + fatalOnErr(err) + + wrap := rtpwrapper.WrapClient(rptClient) + fatalOnErr(err) + + localKey := crypto.MarshalPublicKey(&c.key.PublicKey) + // consider sharing this between application components nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper) // storing calculated trusts as a daughter - c.cfgReputation.localTrustStorage = truststorage.New(truststorage.Prm{}) + c.cfgReputation.localTrustStorage = truststorage.New( + truststorage.Prm{ + LocalServer: c, + }, + ) + + daughterStorage := daughters.New(daughters.Prm{}) + consumerStorage := consumerstorage.New(consumerstorage.Prm{}) // storing received daughter(of current node) trusts as a manager daughterStorageWriterProvider := &intermediate.DaughterStorageWriterProvider{ Log: c.log, - Storage: daughters.New(daughters.Prm{}), + Storage: daughterStorage, } consumerStorageWriterProvider := &intermediate.ConsumerStorageWriterProvider{ Log: c.log, - Storage: consumerstorage.New(consumerstorage.Prm{}), + Storage: consumerStorage, } - trustStorage := &localreputation.TrustStorage{ + localTrustStorage := &localreputation.TrustStorage{ Log: c.log, Storage: c.cfgReputation.localTrustStorage, NmSrc: nmSrc, - LocalKey: crypto.MarshalPublicKey(&c.key.PublicKey), + LocalKey: localKey, } managerBuilder := common.NewManagerBuilder( @@ -109,7 +141,7 @@ func initReputationService(c *cfg) { }, ) - _ = reputationrouter.New( // intermediateTrustRouter + intermediateTrustRouter := reputationrouter.New( reputationrouter.Prm{ LocalServerInfo: c, RemoteWriterProvider: remoteIntermediateTrustProvider, @@ -117,9 +149,43 @@ func initReputationService(c *cfg) { }, ) - c.cfgReputation.localTrustCtrl = trustcontroller.New( - trustcontroller.Prm{ - LocalTrustSource: trustStorage, + eigenTrustCalculator := eigentrustcalc.New( + eigentrustcalc.Prm{ + AlphaProvider: intermediate.AlphaProvider{ + Alpha: EigenTrustAlpha, + }, + InitialTrustSource: intermediatereputation.InitialTrustSource{}, + IntermediateValueTarget: intermediateTrustRouter, + WorkerPool: c.cfgReputation.workerPool, + FinalResultTarget: intermediate.NewFinalWriterProvider( + intermediate.FinalWriterProviderPrm{ + PrivatKey: c.key, + PubKey: localKey, + Client: wrap, + }, + intermediate.FinalWriterWithLogger(c.log), + ), + DaughterTrustSource: &intermediate.DaughterTrustIteratorProvider{ + DaughterStorage: daughterStorage, + ConsumerStorage: consumerStorage, + }, + }, + eigentrustcalc.WithLogger(c.log), + ) + + eigenTrustController := eigentrustctrl.New( + eigentrustctrl.Prm{ + DaughtersTrustCalculator: &intermediate.DaughtersTrustCalculator{ + Calculator: eigenTrustCalculator, + }, + IterationsProvider: c.cfgNetmap.wrapper, + WorkerPool: c.cfgReputation.workerPool, + }, + ) + + c.cfgReputation.localTrustCtrl = localtrustcontroller.New( + localtrustcontroller.Prm{ + LocalTrustSource: localTrustStorage, LocalTrustTarget: localTrustRouter, }, ) @@ -127,7 +193,7 @@ func initReputationService(c *cfg) { addNewEpochAsyncNotificationHandler( c, func(ev event.Event) { - var reportPrm trustcontroller.ReportPrm + var reportPrm localtrustcontroller.ReportPrm // report collected values from previous epoch reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) @@ -142,10 +208,11 @@ func initReputationService(c *cfg) { c.key, reputationrpc.NewResponseService( &reputationServer{ - cfg: c, - log: c.log, - router: localTrustRouter, - routeBuilder: localRouteBuilder, + cfg: c, + log: c.log, + localRouter: localTrustRouter, + intermediateRouter: intermediateTrustRouter, + routeBuilder: localRouteBuilder, }, c.respSvc, ), @@ -157,7 +224,21 @@ func initReputationService(c *cfg) { durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper) newEigenTrustIterTimer(c, durationMeter, func() { - c.log.Debug("todo: start next EigenTrust iteration round") + epoch, err := c.cfgNetmap.wrapper.Epoch() + if err != nil { + c.log.Debug( + "could not get current epoch", + zap.String("error", err.Error()), + ) + + return + } + + eigenTrustController.Continue( + eigentrustctrl.ContinuePrm{ + Epoch: epoch - 1, + }, + ) }) addNewEpochAsyncNotificationHandler( @@ -176,24 +257,14 @@ func initReputationService(c *cfg) { type reputationServer struct { *cfg - log *logger.Logger - router reputationcommon.WriterProvider - routeBuilder reputationrouter.Builder + log *logger.Logger + localRouter reputationcommon.WriterProvider + intermediateRouter reputationcommon.WriterProvider + routeBuilder reputationrouter.Builder } func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) { - var passedRoute []reputationcommon.ServerInfo - - for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { - passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{ - Key: hdr.GetBodySignature().GetKey(), - }) - } - - for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 { - passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left] - } - + passedRoute := reverseRoute(req.GetVerificationHeader()) passedRoute = append(passedRoute, s) body := req.GetBody() @@ -203,15 +274,15 @@ func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation E: body.GetEpoch(), } - w, err := s.router.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) + w, err := s.localRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) if err != nil { return nil, errors.Wrap(err, "could not initialize local trust writer") } for _, trust := range body.GetTrusts() { - err = s.processTrust(body.GetEpoch(), apiToLocalTrust(trust, passedRoute[0].PublicKey()), passedRoute, w) + err = s.processLocalTrust(body.GetEpoch(), apiToLocalTrust(trust, passedRoute[0].PublicKey()), passedRoute, w) if err != nil { - return nil, errors.Wrap(err, "could not write one of trusts") + return nil, errors.Wrap(err, "could not write one of local trusts") } } @@ -221,14 +292,49 @@ func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation return resp, nil } -func (s *reputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) { - resp := new(v2reputation.SendIntermediateResultResponse) +func (s *reputationServer) SendIntermediateResult(ctx context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) { + passedRoute := reverseRoute(req.GetVerificationHeader()) + passedRoute = append(passedRoute, s) - // todo: implement me + body := req.GetBody() + + eCtx := &common.EpochContext{ + Context: ctx, + E: body.GetEpoch(), + } + + w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) + if err != nil { + return nil, errors.Wrap(err, "could not initialize intermediate trust writer") + } + + v2Trust := body.GetTrust() + + trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetValue()) + + eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration()) + + err = w.Write(eiCtx, trust) + if err != nil { + return nil, errors.Wrap(err, "could not write intermediate trust") + } + + resp := new(v2reputation.SendIntermediateResultResponse) + resp.SetBody(new(v2reputation.SendIntermediateResultResponseBody)) return resp, nil } +func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust, + passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error { + err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute) + if err != nil { + return errors.Wrap(err, "wrong route of reputation trust value") + } + + return w.Write(&common.EpochContext{E: epoch}, t) +} + // apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer. func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trust { localTrust := reputation.Trust{} @@ -240,12 +346,14 @@ func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trus return localTrust } -func (s *reputationServer) processTrust(epoch uint64, t reputation.Trust, - passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error { - err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute) - if err != nil { - return errors.Wrap(err, "wrong route of reputation trust value") +func reverseRoute(hdr *session.RequestVerificationHeader) (passedRoute []reputationcommon.ServerInfo) { + for hdr != nil { + passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{ + Key: hdr.GetBodySignature().GetKey(), + }) + + hdr = hdr.GetOrigin() } - return w.Write(&common.EpochContext{E: epoch}, t) + return }