From 60cc3b3e164b9a5f0f2bfcd83db3dfb9fe1e396e Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 6 Apr 2021 20:15:09 +0300 Subject: [PATCH] [#460] reputation: Add `Router` to reputation server Add `Router` to the reputation server. `Router` is called on every incoming request and inits `Writer` that sends `Trust`s to the next route point or handle(logs in that implementation) them if current node is the end point of the route. Rename `onlyKeyRemoteServerInfo` struct for container to separate it from the same implementation of the same `ServerInfo` interface for reputation. Signed-off-by: Pavel Karpy --- cmd/neofs-node/container.go | 8 +- cmd/neofs-node/reputation.go | 93 ++++++++++++++++++--- pkg/services/reputation/local/route/util.go | 39 +++++++++ 3 files changed, 123 insertions(+), 17 deletions(-) create mode 100644 pkg/services/reputation/local/route/util.go diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index b38169a6..0155086b 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -380,7 +380,7 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container var passedRoute []loadroute.ServerInfo for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { - passedRoute = append(passedRoute, &onlyKeyRemoteServerInfo{ + passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{ key: hdr.GetBodySignature().GetKey(), }) } @@ -412,15 +412,15 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container var errNodeOutsideContainer = errors.New("node outside the container") -type onlyKeyRemoteServerInfo struct { +type containerOnlyKeyRemoteServerInfo struct { key []byte } -func (i *onlyKeyRemoteServerInfo) PublicKey() []byte { +func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte { return i.key } -func (*onlyKeyRemoteServerInfo) Address() string { +func (*containerOnlyKeyRemoteServerInfo) Address() string { return "" } diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 39ec758f..714df7c6 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -122,7 +122,7 @@ type localTrustLogger struct { } func (l *localTrustLogger) Write(t reputation.Trust) error { - l.log.Info("new local trust", + l.log.Info("received local trust", zap.Uint64("epoch", l.ctx.Epoch()), zap.String("peer", hex.EncodeToString(t.Peer().Bytes())), zap.Stringer("value", t.Value()), @@ -324,8 +324,11 @@ func initReputationService(c *cfg) { reputationrpc.NewSignService( c.key, reputationrpc.NewResponseService( - &loggingReputationServer{ - log: c.log, + &reputationServer{ + cfg: c, + log: c.log, + router: router, + routeBuilder: routeBuilder, }, c.respSvc, ), @@ -334,20 +337,65 @@ func initReputationService(c *cfg) { ) } -type loggingReputationServer struct { - log *logger.Logger +type reputationServer struct { + *cfg + log *logger.Logger + router trustcontroller.WriterProvider + routeBuilder reputationroute.Builder } -func (s *loggingReputationServer) SendLocalTrust(_ context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) { +type epochContext struct { + context.Context + epoch uint64 +} + +func (ctx *epochContext) Epoch() uint64 { + return ctx.epoch +} + +type reputationOnlyKeyRemoteServerInfo struct { + key []byte +} + +func (i *reputationOnlyKeyRemoteServerInfo) PublicKey() []byte { + return i.key +} + +func (*reputationOnlyKeyRemoteServerInfo) Address() string { + return "" +} + +func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) { + var passedRoute []reputationroute.ServerInfo + + for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { + passedRoute = append(passedRoute, &reputationOnlyKeyRemoteServerInfo{ + key: hdr.GetBodySignature().GetKey(), + }) + } + + for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 { + passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left] + } + + passedRoute = append(passedRoute, s) + body := req.GetBody() - log := s.log.With(zap.Uint64("epoch", body.GetEpoch())) + eCtx := &epochContext{ + Context: ctx, + epoch: body.GetEpoch(), + } - for _, t := range body.GetTrusts() { - log.Info("local trust received", - zap.String("peer", hex.EncodeToString(t.GetPeer().GetValue())), - zap.Float64("value", t.GetValue()), - ) + w, err := s.router.InitWriter(reputationroute.NewRouteContext(eCtx, passedRoute)) + if err != nil { + return nil, errors.Wrap(err, "could not initialize local trust writer") + } + + for _, trust := range body.GetTrusts() { + if err = s.processTrust(body.GetEpoch(), apiToLocalTrust(trust), passedRoute, w); err != nil { + return nil, errors.Wrap(err, "could not write one of trusts") + } } resp := new(v2reputation.SendLocalTrustResponse) @@ -356,10 +404,29 @@ func (s *loggingReputationServer) SendLocalTrust(_ context.Context, req *v2reput return resp, nil } -func (s *loggingReputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) { +func (s *reputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) { resp := new(v2reputation.SendIntermediateResultResponse) // todo: implement me return resp, nil } + +func apiToLocalTrust(t *v2reputation.Trust) reputation.Trust { + localTrust := reputation.Trust{} + + localTrust.SetValue(reputation.TrustValueFromFloat64(t.GetValue())) + localTrust.SetPeer(reputation.PeerIDFromBytes(t.GetPeer().GetValue())) + + return localTrust +} + +func (s *reputationServer) processTrust(epoch uint64, t reputation.Trust, + passedRoute []reputationroute.ServerInfo, w trustcontroller.Writer) error { + err := reputationroute.CheckRoute(s.routeBuilder, epoch, reputation.PeerIDFromBytes(passedRoute[0].PublicKey()), passedRoute) + if err != nil { + return errors.Wrap(err, "wrong route of reputation trust value") + } + + return w.Write(t) +} diff --git a/pkg/services/reputation/local/route/util.go b/pkg/services/reputation/local/route/util.go new file mode 100644 index 00000000..a700a506 --- /dev/null +++ b/pkg/services/reputation/local/route/util.go @@ -0,0 +1,39 @@ +package reputationroute + +import ( + "bytes" + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" +) + +var errWrongRoute = errors.New("wrong route") + +// CheckRoute checks if the route is a route correctly constructed by the builder for value a. +// +// Returns nil if route is correct, otherwise an error clarifying the inconsistency. +func CheckRoute(builder Builder, epoch uint64, peer reputation.PeerID, route []ServerInfo) error { + for i := 1; i < len(route); i++ { + servers, err := builder.NextStage(epoch, peer, route[:i]) + if err != nil { + return err + } else if len(servers) == 0 { + break + } + + found := false + + for j := range servers { + if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) { + found = true + break + } + } + + if !found { + return errWrongRoute + } + } + + return nil +}