forked from TrueCloudLab/frostfs-node
[#460] reputation: Add Router
to reputation server
Add `Router` to the reputation server. `Router` is called on every incoming request and inits `Writer` that sends `Trust`s to the next route point or handle(logs in that implementation) them if current node is the end point of the route. Rename `onlyKeyRemoteServerInfo` struct for container to separate it from the same implementation of the same `ServerInfo` interface for reputation. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
39f55611c6
commit
60cc3b3e16
3 changed files with 123 additions and 17 deletions
|
@ -380,7 +380,7 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
|
|||
var passedRoute []loadroute.ServerInfo
|
||||
|
||||
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
||||
passedRoute = append(passedRoute, &onlyKeyRemoteServerInfo{
|
||||
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
||||
key: hdr.GetBodySignature().GetKey(),
|
||||
})
|
||||
}
|
||||
|
@ -412,15 +412,15 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
|
|||
|
||||
var errNodeOutsideContainer = errors.New("node outside the container")
|
||||
|
||||
type onlyKeyRemoteServerInfo struct {
|
||||
type containerOnlyKeyRemoteServerInfo struct {
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (i *onlyKeyRemoteServerInfo) PublicKey() []byte {
|
||||
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
|
||||
return i.key
|
||||
}
|
||||
|
||||
func (*onlyKeyRemoteServerInfo) Address() string {
|
||||
func (*containerOnlyKeyRemoteServerInfo) Address() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ type localTrustLogger struct {
|
|||
}
|
||||
|
||||
func (l *localTrustLogger) Write(t reputation.Trust) error {
|
||||
l.log.Info("new local trust",
|
||||
l.log.Info("received local trust",
|
||||
zap.Uint64("epoch", l.ctx.Epoch()),
|
||||
zap.String("peer", hex.EncodeToString(t.Peer().Bytes())),
|
||||
zap.Stringer("value", t.Value()),
|
||||
|
@ -324,8 +324,11 @@ func initReputationService(c *cfg) {
|
|||
reputationrpc.NewSignService(
|
||||
c.key,
|
||||
reputationrpc.NewResponseService(
|
||||
&loggingReputationServer{
|
||||
log: c.log,
|
||||
&reputationServer{
|
||||
cfg: c,
|
||||
log: c.log,
|
||||
router: router,
|
||||
routeBuilder: routeBuilder,
|
||||
},
|
||||
c.respSvc,
|
||||
),
|
||||
|
@ -334,20 +337,65 @@ func initReputationService(c *cfg) {
|
|||
)
|
||||
}
|
||||
|
||||
type loggingReputationServer struct {
|
||||
log *logger.Logger
|
||||
type reputationServer struct {
|
||||
*cfg
|
||||
log *logger.Logger
|
||||
router trustcontroller.WriterProvider
|
||||
routeBuilder reputationroute.Builder
|
||||
}
|
||||
|
||||
func (s *loggingReputationServer) SendLocalTrust(_ context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) {
|
||||
type epochContext struct {
|
||||
context.Context
|
||||
epoch uint64
|
||||
}
|
||||
|
||||
func (ctx *epochContext) Epoch() uint64 {
|
||||
return ctx.epoch
|
||||
}
|
||||
|
||||
type reputationOnlyKeyRemoteServerInfo struct {
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (i *reputationOnlyKeyRemoteServerInfo) PublicKey() []byte {
|
||||
return i.key
|
||||
}
|
||||
|
||||
func (*reputationOnlyKeyRemoteServerInfo) Address() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) {
|
||||
var passedRoute []reputationroute.ServerInfo
|
||||
|
||||
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
||||
passedRoute = append(passedRoute, &reputationOnlyKeyRemoteServerInfo{
|
||||
key: hdr.GetBodySignature().GetKey(),
|
||||
})
|
||||
}
|
||||
|
||||
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
|
||||
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
|
||||
}
|
||||
|
||||
passedRoute = append(passedRoute, s)
|
||||
|
||||
body := req.GetBody()
|
||||
|
||||
log := s.log.With(zap.Uint64("epoch", body.GetEpoch()))
|
||||
eCtx := &epochContext{
|
||||
Context: ctx,
|
||||
epoch: body.GetEpoch(),
|
||||
}
|
||||
|
||||
for _, t := range body.GetTrusts() {
|
||||
log.Info("local trust received",
|
||||
zap.String("peer", hex.EncodeToString(t.GetPeer().GetValue())),
|
||||
zap.Float64("value", t.GetValue()),
|
||||
)
|
||||
w, err := s.router.InitWriter(reputationroute.NewRouteContext(eCtx, passedRoute))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not initialize local trust writer")
|
||||
}
|
||||
|
||||
for _, trust := range body.GetTrusts() {
|
||||
if err = s.processTrust(body.GetEpoch(), apiToLocalTrust(trust), passedRoute, w); err != nil {
|
||||
return nil, errors.Wrap(err, "could not write one of trusts")
|
||||
}
|
||||
}
|
||||
|
||||
resp := new(v2reputation.SendLocalTrustResponse)
|
||||
|
@ -356,10 +404,29 @@ func (s *loggingReputationServer) SendLocalTrust(_ context.Context, req *v2reput
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *loggingReputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) {
|
||||
func (s *reputationServer) SendIntermediateResult(_ context.Context, req *v2reputation.SendIntermediateResultRequest) (*v2reputation.SendIntermediateResultResponse, error) {
|
||||
resp := new(v2reputation.SendIntermediateResultResponse)
|
||||
|
||||
// todo: implement me
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func apiToLocalTrust(t *v2reputation.Trust) reputation.Trust {
|
||||
localTrust := reputation.Trust{}
|
||||
|
||||
localTrust.SetValue(reputation.TrustValueFromFloat64(t.GetValue()))
|
||||
localTrust.SetPeer(reputation.PeerIDFromBytes(t.GetPeer().GetValue()))
|
||||
|
||||
return localTrust
|
||||
}
|
||||
|
||||
func (s *reputationServer) processTrust(epoch uint64, t reputation.Trust,
|
||||
passedRoute []reputationroute.ServerInfo, w trustcontroller.Writer) error {
|
||||
err := reputationroute.CheckRoute(s.routeBuilder, epoch, reputation.PeerIDFromBytes(passedRoute[0].PublicKey()), passedRoute)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "wrong route of reputation trust value")
|
||||
}
|
||||
|
||||
return w.Write(t)
|
||||
}
|
||||
|
|
39
pkg/services/reputation/local/route/util.go
Normal file
39
pkg/services/reputation/local/route/util.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package reputationroute
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
)
|
||||
|
||||
var errWrongRoute = errors.New("wrong route")
|
||||
|
||||
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
||||
//
|
||||
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
||||
func CheckRoute(builder Builder, epoch uint64, peer reputation.PeerID, route []ServerInfo) error {
|
||||
for i := 1; i < len(route); i++ {
|
||||
servers, err := builder.NextStage(epoch, peer, route[:i])
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(servers) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
found := false
|
||||
|
||||
for j := range servers {
|
||||
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return errWrongRoute
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue