forked from TrueCloudLab/frostfs-node
[#460] reputation/local: Implement local trust router
Implement reputation `Router` and its constructor, designed to define where to send local trusts. Router is based on dependencies that are hidden behind interfaces, that are declared in the router's package. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
e6c9fb283c
commit
91825a0162
4 changed files with 269 additions and 0 deletions
132
pkg/services/reputation/local/route/calls.go
Normal file
132
pkg/services/reputation/local/route/calls.go
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
package reputationroute
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||||
|
reputationcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type routeContext struct {
|
||||||
|
reputationcontroller.Context
|
||||||
|
|
||||||
|
passedRoute []ServerInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRouteContext wraps the main context of value passing with its traversal route and epoch.
|
||||||
|
func NewRouteContext(ctx reputationcontroller.Context, passed []ServerInfo) reputationcontroller.Context {
|
||||||
|
return &routeContext{
|
||||||
|
Context: ctx,
|
||||||
|
passedRoute: passed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type trustWriter struct {
|
||||||
|
router *Router
|
||||||
|
|
||||||
|
ctx *routeContext
|
||||||
|
|
||||||
|
routeMtx sync.RWMutex
|
||||||
|
mServers map[string]reputationcontroller.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitWriter initializes and returns Writer that sends each value to its next route point.
|
||||||
|
//
|
||||||
|
// If ctx was created by NewRouteContext, then the traversed route is taken into account,
|
||||||
|
// and the value will be sent to its continuation. Otherwise, the route will be laid
|
||||||
|
// from scratch and the value will be sent to its primary point.
|
||||||
|
//
|
||||||
|
// After building a list of remote points of the next leg of the route, the value is sent
|
||||||
|
// sequentially to all of them. If any transmissions (even all) fail, an error will not
|
||||||
|
// be returned.
|
||||||
|
//
|
||||||
|
// Close of the composed Writer calls Close method on each internal Writer generated in
|
||||||
|
// runtime and never returns an error.
|
||||||
|
//
|
||||||
|
// Always returns nil error.
|
||||||
|
func (r *Router) InitWriter(ctx reputationcontroller.Context) (reputationcontroller.Writer, error) {
|
||||||
|
var (
|
||||||
|
routeCtx *routeContext
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
|
||||||
|
if routeCtx, ok = ctx.(*routeContext); !ok {
|
||||||
|
routeCtx = &routeContext{
|
||||||
|
Context: ctx,
|
||||||
|
passedRoute: []ServerInfo{r.localSrvInfo},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &trustWriter{
|
||||||
|
router: r,
|
||||||
|
ctx: routeCtx,
|
||||||
|
mServers: make(map[string]reputationcontroller.Writer),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *trustWriter) Write(a reputation.Trust) error {
|
||||||
|
w.routeMtx.Lock()
|
||||||
|
defer w.routeMtx.Unlock()
|
||||||
|
|
||||||
|
route, err := w.router.routeBuilder.NextStage(w.ctx.Epoch(), w.ctx.passedRoute)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if len(route) == 0 {
|
||||||
|
route = []ServerInfo{nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, remoteInfo := range route {
|
||||||
|
var endpoint string
|
||||||
|
|
||||||
|
if remoteInfo != nil {
|
||||||
|
endpoint = remoteInfo.Address()
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteWriter, ok := w.mServers[endpoint]
|
||||||
|
if !ok {
|
||||||
|
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
|
||||||
|
if err != nil {
|
||||||
|
w.router.log.Debug("could not initialize writer provider",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteWriter, err = provider.InitWriter(w.ctx)
|
||||||
|
if err != nil {
|
||||||
|
w.router.log.Debug("could not initialize writer",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
w.mServers[endpoint] = remoteWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
err := remoteWriter.Write(a)
|
||||||
|
if err != nil {
|
||||||
|
w.router.log.Debug("could not write the value",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *trustWriter) Close() error {
|
||||||
|
for endpoint, wRemote := range w.mServers {
|
||||||
|
err := wRemote.Close()
|
||||||
|
if err != nil {
|
||||||
|
w.router.log.Debug("could not close remote server writer",
|
||||||
|
zap.String("endpoint", endpoint),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
29
pkg/services/reputation/local/route/deps.go
Normal file
29
pkg/services/reputation/local/route/deps.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package reputationroute
|
||||||
|
|
||||||
|
import (
|
||||||
|
reputationcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServerInfo describes a set of
|
||||||
|
// characteristics of a point in a route.
|
||||||
|
type ServerInfo interface {
|
||||||
|
// PublicKey returns public key of the node
|
||||||
|
// from the route in a binary representation.
|
||||||
|
PublicKey() []byte
|
||||||
|
|
||||||
|
// Returns network address of the node
|
||||||
|
// in the route.
|
||||||
|
//
|
||||||
|
// Can be empty.
|
||||||
|
Address() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteWriterProvider describes the component
|
||||||
|
// for sending values to a fixed route point.
|
||||||
|
type RemoteWriterProvider interface {
|
||||||
|
// InitRemote must return WriterProvider to the route point
|
||||||
|
// corresponding to info.
|
||||||
|
//
|
||||||
|
// Nil info matches the end of the route.
|
||||||
|
InitRemote(info ServerInfo) (reputationcontroller.WriterProvider, error)
|
||||||
|
}
|
28
pkg/services/reputation/local/route/opts.go
Normal file
28
pkg/services/reputation/local/route/opts.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package reputationroute
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Option sets an optional parameter of Router.
|
||||||
|
type Option func(*options)
|
||||||
|
|
||||||
|
type options struct {
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultOpts() *options {
|
||||||
|
return &options{
|
||||||
|
log: zap.L(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger returns Option to specify logging component.
|
||||||
|
func WithLogger(l *logger.Logger) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
if l != nil {
|
||||||
|
o.log = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
80
pkg/services/reputation/local/route/router.go
Normal file
80
pkg/services/reputation/local/route/router.go
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
package reputationroute
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Prm groups the required parameters of the Router's constructor.
|
||||||
|
//
|
||||||
|
// All values must comply with the requirements imposed on them.
|
||||||
|
// Passing incorrect parameter values will result in constructor
|
||||||
|
// failure (error or panic depending on the implementation).
|
||||||
|
type Prm struct {
|
||||||
|
// Characteristics of the local node's server.
|
||||||
|
//
|
||||||
|
// Must not be nil.
|
||||||
|
LocalServerInfo ServerInfo
|
||||||
|
|
||||||
|
// Component for sending values to a fixed route point.
|
||||||
|
//
|
||||||
|
// Must not be nil.
|
||||||
|
RemoteWriterProvider RemoteWriterProvider
|
||||||
|
|
||||||
|
// Route planner.
|
||||||
|
//
|
||||||
|
// Must not be nil.
|
||||||
|
Builder Builder
|
||||||
|
}
|
||||||
|
|
||||||
|
// Router represents component responsible for routing
|
||||||
|
// local trust values over the network.
|
||||||
|
//
|
||||||
|
// For each fixed pair (node peer, epoch) there is a
|
||||||
|
// single value route on the network. Router provides the
|
||||||
|
// interface for writing values to the next point of the route.
|
||||||
|
//
|
||||||
|
// For correct operation, Router must be created using
|
||||||
|
// the constructor (New) based on the required parameters
|
||||||
|
// and optional components. After successful creation,
|
||||||
|
// the Router is immediately ready to work through API.
|
||||||
|
type Router struct {
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
remoteProvider RemoteWriterProvider
|
||||||
|
|
||||||
|
routeBuilder Builder
|
||||||
|
|
||||||
|
localSrvInfo ServerInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||||
|
|
||||||
|
func panicOnPrmValue(n string, v interface{}) {
|
||||||
|
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(prm Prm, opts ...Option) *Router {
|
||||||
|
switch {
|
||||||
|
case prm.RemoteWriterProvider == nil:
|
||||||
|
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
|
||||||
|
case prm.Builder == nil:
|
||||||
|
panicOnPrmValue("Builder", prm.Builder)
|
||||||
|
case prm.LocalServerInfo == nil:
|
||||||
|
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
o := defaultOpts()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](o)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Router{
|
||||||
|
log: o.log,
|
||||||
|
remoteProvider: prm.RemoteWriterProvider,
|
||||||
|
routeBuilder: prm.Builder,
|
||||||
|
localSrvInfo: prm.LocalServerInfo,
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue