From 91825a0162c8c39055d45666d0875c9996b4d6ee Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 2 Apr 2021 21:16:09 +0300 Subject: [PATCH] [#460] reputation/local: Implement local trust router Implement reputation `Router` and its constructor, designed to define where to send local trusts. Router is based on dependencies that are hidden behind interfaces, that are declared in the router's package. Signed-off-by: Pavel Karpy --- pkg/services/reputation/local/route/calls.go | 132 ++++++++++++++++++ pkg/services/reputation/local/route/deps.go | 29 ++++ pkg/services/reputation/local/route/opts.go | 28 ++++ pkg/services/reputation/local/route/router.go | 80 +++++++++++ 4 files changed, 269 insertions(+) create mode 100644 pkg/services/reputation/local/route/calls.go create mode 100644 pkg/services/reputation/local/route/deps.go create mode 100644 pkg/services/reputation/local/route/opts.go create mode 100644 pkg/services/reputation/local/route/router.go diff --git a/pkg/services/reputation/local/route/calls.go b/pkg/services/reputation/local/route/calls.go new file mode 100644 index 000000000..c239088ca --- /dev/null +++ b/pkg/services/reputation/local/route/calls.go @@ -0,0 +1,132 @@ +package reputationroute + +import ( + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + reputationcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" + "go.uber.org/zap" +) + +type routeContext struct { + reputationcontroller.Context + + passedRoute []ServerInfo +} + +// NewRouteContext wraps the main context of value passing with its traversal route and epoch. +func NewRouteContext(ctx reputationcontroller.Context, passed []ServerInfo) reputationcontroller.Context { + return &routeContext{ + Context: ctx, + passedRoute: passed, + } +} + +type trustWriter struct { + router *Router + + ctx *routeContext + + routeMtx sync.RWMutex + mServers map[string]reputationcontroller.Writer +} + +// InitWriter initializes and returns Writer that sends each value to its next route point. +// +// If ctx was created by NewRouteContext, then the traversed route is taken into account, +// and the value will be sent to its continuation. Otherwise, the route will be laid +// from scratch and the value will be sent to its primary point. +// +// After building a list of remote points of the next leg of the route, the value is sent +// sequentially to all of them. If any transmissions (even all) fail, an error will not +// be returned. +// +// Close of the composed Writer calls Close method on each internal Writer generated in +// runtime and never returns an error. +// +// Always returns nil error. +func (r *Router) InitWriter(ctx reputationcontroller.Context) (reputationcontroller.Writer, error) { + var ( + routeCtx *routeContext + ok bool + ) + + if routeCtx, ok = ctx.(*routeContext); !ok { + routeCtx = &routeContext{ + Context: ctx, + passedRoute: []ServerInfo{r.localSrvInfo}, + } + } + + return &trustWriter{ + router: r, + ctx: routeCtx, + mServers: make(map[string]reputationcontroller.Writer), + }, nil +} + +func (w *trustWriter) Write(a reputation.Trust) error { + w.routeMtx.Lock() + defer w.routeMtx.Unlock() + + route, err := w.router.routeBuilder.NextStage(w.ctx.Epoch(), w.ctx.passedRoute) + if err != nil { + return err + } else if len(route) == 0 { + route = []ServerInfo{nil} + } + + for _, remoteInfo := range route { + var endpoint string + + if remoteInfo != nil { + endpoint = remoteInfo.Address() + } + + remoteWriter, ok := w.mServers[endpoint] + if !ok { + provider, err := w.router.remoteProvider.InitRemote(remoteInfo) + if err != nil { + w.router.log.Debug("could not initialize writer provider", + zap.String("error", err.Error()), + ) + + continue + } + + remoteWriter, err = provider.InitWriter(w.ctx) + if err != nil { + w.router.log.Debug("could not initialize writer", + zap.String("error", err.Error()), + ) + + continue + } + + w.mServers[endpoint] = remoteWriter + } + + err := remoteWriter.Write(a) + if err != nil { + w.router.log.Debug("could not write the value", + zap.String("error", err.Error()), + ) + } + } + + return nil +} + +func (w *trustWriter) Close() error { + for endpoint, wRemote := range w.mServers { + err := wRemote.Close() + if err != nil { + w.router.log.Debug("could not close remote server writer", + zap.String("endpoint", endpoint), + zap.String("error", err.Error()), + ) + } + } + + return nil +} diff --git a/pkg/services/reputation/local/route/deps.go b/pkg/services/reputation/local/route/deps.go new file mode 100644 index 000000000..0975f3812 --- /dev/null +++ b/pkg/services/reputation/local/route/deps.go @@ -0,0 +1,29 @@ +package reputationroute + +import ( + reputationcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" +) + +// ServerInfo describes a set of +// characteristics of a point in a route. +type ServerInfo interface { + // PublicKey returns public key of the node + // from the route in a binary representation. + PublicKey() []byte + + // Returns network address of the node + // in the route. + // + // Can be empty. + Address() string +} + +// RemoteWriterProvider describes the component +// for sending values to a fixed route point. +type RemoteWriterProvider interface { + // InitRemote must return WriterProvider to the route point + // corresponding to info. + // + // Nil info matches the end of the route. + InitRemote(info ServerInfo) (reputationcontroller.WriterProvider, error) +} diff --git a/pkg/services/reputation/local/route/opts.go b/pkg/services/reputation/local/route/opts.go new file mode 100644 index 000000000..1cf3f61f4 --- /dev/null +++ b/pkg/services/reputation/local/route/opts.go @@ -0,0 +1,28 @@ +package reputationroute + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Router. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns Option to specify logging component. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/reputation/local/route/router.go b/pkg/services/reputation/local/route/router.go new file mode 100644 index 000000000..895f77593 --- /dev/null +++ b/pkg/services/reputation/local/route/router.go @@ -0,0 +1,80 @@ +package reputationroute + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/util/logger" +) + +// Prm groups the required parameters of the Router's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Characteristics of the local node's server. + // + // Must not be nil. + LocalServerInfo ServerInfo + + // Component for sending values to a fixed route point. + // + // Must not be nil. + RemoteWriterProvider RemoteWriterProvider + + // Route planner. + // + // Must not be nil. + Builder Builder +} + +// Router represents component responsible for routing +// local trust values over the network. +// +// For each fixed pair (node peer, epoch) there is a +// single value route on the network. Router provides the +// interface for writing values to the next point of the route. +// +// For correct operation, Router must be created using +// the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the Router is immediately ready to work through API. +type Router struct { + log *logger.Logger + + remoteProvider RemoteWriterProvider + + routeBuilder Builder + + localSrvInfo ServerInfo +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +func New(prm Prm, opts ...Option) *Router { + switch { + case prm.RemoteWriterProvider == nil: + panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider) + case prm.Builder == nil: + panicOnPrmValue("Builder", prm.Builder) + case prm.LocalServerInfo == nil: + panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo) + } + + o := defaultOpts() + + for i := range opts { + opts[i](o) + } + + return &Router{ + log: o.log, + remoteProvider: prm.RemoteWriterProvider, + routeBuilder: prm.Builder, + localSrvInfo: prm.LocalServerInfo, + } +}