[#488] cmd/reputation: Collect all reputation wrappers

Init all wrappers eigenTrust algorithm needed in
`main` packages. Implement `SendIntermediateResult`
GRPC method of `reputationServer`.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-04-29 09:43:15 +03:00 committed by Alex Vanin
parent eb74a9cafc
commit 0d34d7c508

View file

@ -3,13 +3,18 @@ package main
import ( import (
"context" "context"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation" v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation"
v2reputationgrpc "github.com/nspcc-dev/neofs-api-go/v2/reputation/grpc" v2reputationgrpc "github.com/nspcc-dev/neofs-api-go/v2/reputation/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/session"
crypto "github.com/nspcc-dev/neofs-crypto" crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/common" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/common"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate"
intermediatereputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" intermediatereputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate"
localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local" localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
rptclient "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation"
rtpwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/network/cache"
@ -17,10 +22,13 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/reputation" "github.com/nspcc-dev/neofs-node/pkg/services/reputation"
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router"
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust"
eigentrustcalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator"
eigentrustctrl "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/controller"
intermediateroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/routes" intermediateroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/routes"
consumerstorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/consumers" consumerstorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/consumers"
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters" "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" localtrustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
localroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/routes" localroutes "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/routes"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
reputationrpc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/rpc" reputationrpc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/rpc"
@ -29,29 +37,53 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const EigenTrustAlpha = 0.5
func initReputationService(c *cfg) { func initReputationService(c *cfg) {
staticClient, err := client.NewStatic(
c.cfgMorph.client,
c.cfgReputation.scriptHash,
fixedn.Fixed8(0),
)
fatalOnErr(err)
rptClient, err := rptclient.New(staticClient)
fatalOnErr(err)
wrap := rtpwrapper.WrapClient(rptClient)
fatalOnErr(err)
localKey := crypto.MarshalPublicKey(&c.key.PublicKey)
// consider sharing this between application components // consider sharing this between application components
nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper) nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper)
// storing calculated trusts as a daughter // storing calculated trusts as a daughter
c.cfgReputation.localTrustStorage = truststorage.New(truststorage.Prm{}) c.cfgReputation.localTrustStorage = truststorage.New(
truststorage.Prm{
LocalServer: c,
},
)
daughterStorage := daughters.New(daughters.Prm{})
consumerStorage := consumerstorage.New(consumerstorage.Prm{})
// storing received daughter(of current node) trusts as a manager // storing received daughter(of current node) trusts as a manager
daughterStorageWriterProvider := &intermediate.DaughterStorageWriterProvider{ daughterStorageWriterProvider := &intermediate.DaughterStorageWriterProvider{
Log: c.log, Log: c.log,
Storage: daughters.New(daughters.Prm{}), Storage: daughterStorage,
} }
consumerStorageWriterProvider := &intermediate.ConsumerStorageWriterProvider{ consumerStorageWriterProvider := &intermediate.ConsumerStorageWriterProvider{
Log: c.log, Log: c.log,
Storage: consumerstorage.New(consumerstorage.Prm{}), Storage: consumerStorage,
} }
trustStorage := &localreputation.TrustStorage{ localTrustStorage := &localreputation.TrustStorage{
Log: c.log, Log: c.log,
Storage: c.cfgReputation.localTrustStorage, Storage: c.cfgReputation.localTrustStorage,
NmSrc: nmSrc, NmSrc: nmSrc,
LocalKey: crypto.MarshalPublicKey(&c.key.PublicKey), LocalKey: localKey,
} }
managerBuilder := common.NewManagerBuilder( managerBuilder := common.NewManagerBuilder(
@ -109,7 +141,7 @@ func initReputationService(c *cfg) {
}, },
) )
_ = reputationrouter.New( // intermediateTrustRouter intermediateTrustRouter := reputationrouter.New(
reputationrouter.Prm{ reputationrouter.Prm{
LocalServerInfo: c, LocalServerInfo: c,
RemoteWriterProvider: remoteIntermediateTrustProvider, RemoteWriterProvider: remoteIntermediateTrustProvider,
@ -117,9 +149,43 @@ func initReputationService(c *cfg) {
}, },
) )
c.cfgReputation.localTrustCtrl = trustcontroller.New( eigenTrustCalculator := eigentrustcalc.New(
trustcontroller.Prm{ eigentrustcalc.Prm{
LocalTrustSource: trustStorage, AlphaProvider: intermediate.AlphaProvider{
Alpha: EigenTrustAlpha,
},
InitialTrustSource: intermediatereputation.InitialTrustSource{},
IntermediateValueTarget: intermediateTrustRouter,
WorkerPool: c.cfgReputation.workerPool,
FinalResultTarget: intermediate.NewFinalWriterProvider(
intermediate.FinalWriterProviderPrm{
PrivatKey: c.key,
PubKey: localKey,
Client: wrap,
},
intermediate.FinalWriterWithLogger(c.log),
),
DaughterTrustSource: &intermediate.DaughterTrustIteratorProvider{
DaughterStorage: daughterStorage,
ConsumerStorage: consumerStorage,
},
},
eigentrustcalc.WithLogger(c.log),
)
eigenTrustController := eigentrustctrl.New(
eigentrustctrl.Prm{
DaughtersTrustCalculator: &intermediate.DaughtersTrustCalculator{
Calculator: eigenTrustCalculator,
},
IterationsProvider: c.cfgNetmap.wrapper,
WorkerPool: c.cfgReputation.workerPool,
},
)
c.cfgReputation.localTrustCtrl = localtrustcontroller.New(
localtrustcontroller.Prm{
LocalTrustSource: localTrustStorage,
LocalTrustTarget: localTrustRouter, LocalTrustTarget: localTrustRouter,
}, },
) )
@ -127,7 +193,7 @@ func initReputationService(c *cfg) {
addNewEpochAsyncNotificationHandler( addNewEpochAsyncNotificationHandler(
c, c,
func(ev event.Event) { func(ev event.Event) {
var reportPrm trustcontroller.ReportPrm var reportPrm localtrustcontroller.ReportPrm
// report collected values from previous epoch // report collected values from previous epoch
reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1)
@ -142,10 +208,11 @@ func initReputationService(c *cfg) {
c.key, c.key,
reputationrpc.NewResponseService( reputationrpc.NewResponseService(
&reputationServer{ &reputationServer{
cfg: c, cfg: c,
log: c.log, log: c.log,
router: localTrustRouter, localRouter: localTrustRouter,
routeBuilder: localRouteBuilder, intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
}, },
c.respSvc, c.respSvc,
), ),
@ -157,7 +224,21 @@ func initReputationService(c *cfg) {
durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper) durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper)
newEigenTrustIterTimer(c, durationMeter, func() { newEigenTrustIterTimer(c, durationMeter, func() {
c.log.Debug("todo: start next EigenTrust iteration round") epoch, err := c.cfgNetmap.wrapper.Epoch()
if err != nil {
c.log.Debug(
"could not get current epoch",
zap.String("error", err.Error()),
)
return
}
eigenTrustController.Continue(
eigentrustctrl.ContinuePrm{
Epoch: epoch - 1,
},
)
}) })
addNewEpochAsyncNotificationHandler( addNewEpochAsyncNotificationHandler(
@ -176,24 +257,14 @@ func initReputationService(c *cfg) {
type reputationServer struct { type reputationServer struct {
*cfg *cfg
log *logger.Logger log *logger.Logger
router reputationcommon.WriterProvider localRouter reputationcommon.WriterProvider
routeBuilder reputationrouter.Builder intermediateRouter reputationcommon.WriterProvider
routeBuilder reputationrouter.Builder
} }
func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) { func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) {
var passedRoute []reputationcommon.ServerInfo passedRoute := reverseRoute(req.GetVerificationHeader())
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{
Key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, s) passedRoute = append(passedRoute, s)
body := req.GetBody() body := req.GetBody()
@ -203,15 +274,15 @@ func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation
E: body.GetEpoch(), E: body.GetEpoch(),
} }
w, err := s.router.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) w, err := s.localRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not initialize local trust writer") return nil, errors.Wrap(err, "could not initialize local trust writer")
} }
for _, trust := range body.GetTrusts() { for _, trust := range body.GetTrusts() {
err = s.processTrust(body.GetEpoch(), apiToLocalTrust(trust, passedRoute[0].PublicKey()), passedRoute, w) err = s.processLocalTrust(body.GetEpoch(), apiToLocalTrust(trust, passedRoute[0].PublicKey()), passedRoute, w)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not write one of trusts") return nil, errors.Wrap(err, "could not write one of local trusts")
} }
} }
@ -221,14 +292,49 @@ func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation
return resp, nil return resp, nil
} }
func (s *reputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) { func (s *reputationServer) SendIntermediateResult(ctx context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) {
resp := new(v2reputation.SendIntermediateResultResponse) passedRoute := reverseRoute(req.GetVerificationHeader())
passedRoute = append(passedRoute, s)
// todo: implement me body := req.GetBody()
eCtx := &common.EpochContext{
Context: ctx,
E: body.GetEpoch(),
}
w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute))
if err != nil {
return nil, errors.Wrap(err, "could not initialize intermediate trust writer")
}
v2Trust := body.GetTrust()
trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetValue())
eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration())
err = w.Write(eiCtx, trust)
if err != nil {
return nil, errors.Wrap(err, "could not write intermediate trust")
}
resp := new(v2reputation.SendIntermediateResultResponse)
resp.SetBody(new(v2reputation.SendIntermediateResultResponseBody))
return resp, nil return resp, nil
} }
func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust,
passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error {
err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute)
if err != nil {
return errors.Wrap(err, "wrong route of reputation trust value")
}
return w.Write(&common.EpochContext{E: epoch}, t)
}
// apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer. // apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer.
func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trust { func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trust {
localTrust := reputation.Trust{} localTrust := reputation.Trust{}
@ -240,12 +346,14 @@ func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trus
return localTrust return localTrust
} }
func (s *reputationServer) processTrust(epoch uint64, t reputation.Trust, func reverseRoute(hdr *session.RequestVerificationHeader) (passedRoute []reputationcommon.ServerInfo) {
passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error { for hdr != nil {
err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute) passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{
if err != nil { Key: hdr.GetBodySignature().GetKey(),
return errors.Wrap(err, "wrong route of reputation trust value") })
hdr = hdr.GetOrigin()
} }
return w.Write(&common.EpochContext{E: epoch}, t) return
} }