From d48fb81193ede80a20d76368746dff508e653348 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 29 Jan 2021 03:03:30 +0300 Subject: [PATCH] [#328] container/load: Implement route controller Implement a component for transmitting the value of the used container space along a route defined in the system. Implement WriterProvider interface on it. By implementation, it is the link between the route planner and the point-to-point transmitter, and abstracts from the implementation of both of them. In the future, this implementation will be used as a transmitter of local estimates of storage nodes among themselves. Signed-off-by: Leonard Lyubich --- .../announcement/load/route/calls.go | 167 ++++++++++++++++++ .../container/announcement/load/route/deps.go | 41 +++++ .../container/announcement/load/route/opts.go | 28 +++ .../announcement/load/route/router.go | 86 +++++++++ .../container/announcement/load/route/util.go | 39 ++++ 5 files changed, 361 insertions(+) create mode 100644 pkg/services/container/announcement/load/route/calls.go create mode 100644 pkg/services/container/announcement/load/route/deps.go create mode 100644 pkg/services/container/announcement/load/route/opts.go create mode 100644 pkg/services/container/announcement/load/route/router.go create mode 100644 pkg/services/container/announcement/load/route/util.go diff --git a/pkg/services/container/announcement/load/route/calls.go b/pkg/services/container/announcement/load/route/calls.go new file mode 100644 index 000000000..f0e4147f1 --- /dev/null +++ b/pkg/services/container/announcement/load/route/calls.go @@ -0,0 +1,167 @@ +package loadroute + +import ( + "context" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" + "go.uber.org/zap" +) + +type routeContext struct { + context.Context + + passedRoute []ServerInfo +} + +// NewRouteContext wraps the main context of value passing with its traversal route. +// +// Passing the result to Router.InitWriter method will allow you to continue this route. +func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context { + return &routeContext{ + Context: ctx, + passedRoute: passed, + } +} + +// InitWriter initializes and returns Writer that sends each value to its next route point. +// +// If ctx was created by NewRouteContext, then the traversed route is taken into account, +// and the value will be sent to its continuation. Otherwise, the route will be laid +// from scratch and the value will be sent to its primary point. +// +// After building a list of remote points of the next leg of the route, the value is sent +// sequentially to all of them. If any transmissions (even all) fail, an error will not +// be returned. +// +// Close of the composed Writer calls Close method on each internal Writer generated in +// runtime and never returns an error. +// +// Always returns nil error. +func (r *Router) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { + var ( + routeCtx *routeContext + ok bool + ) + + if routeCtx, ok = ctx.(*routeContext); !ok { + routeCtx = &routeContext{ + Context: ctx, + passedRoute: []ServerInfo{r.localSrvInfo}, + } + } + + return &loadWriter{ + router: r, + ctx: routeCtx, + mRoute: make(map[routeKey]*valuesRoute), + mServers: make(map[string]loadcontroller.Writer), + }, nil +} + +type routeKey struct { + epoch uint64 + + cid string +} + +type valuesRoute struct { + route []ServerInfo + + values []container.UsedSpaceAnnouncement +} + +type loadWriter struct { + router *Router + + ctx *routeContext + + routeMtx sync.RWMutex + mRoute map[routeKey]*valuesRoute + + mServers map[string]loadcontroller.Writer +} + +func (w *loadWriter) Put(a container.UsedSpaceAnnouncement) error { + w.routeMtx.Lock() + defer w.routeMtx.Unlock() + + key := routeKey{ + epoch: a.Epoch(), + cid: a.ContainerID().String(), + } + + routeValues, ok := w.mRoute[key] + if !ok { + route, err := w.router.routeBuilder.NextStage(a, w.ctx.passedRoute) + if err != nil { + return err + } else if len(route) == 0 { + route = []ServerInfo{nil} + } + + routeValues = &valuesRoute{ + route: route, + values: []container.UsedSpaceAnnouncement{a}, + } + + w.mRoute[key] = routeValues + } + + for _, remoteInfo := range routeValues.route { + var endpoint string + + if remoteInfo != nil { + endpoint = remoteInfo.Address() + } + + remoteWriter, ok := w.mServers[endpoint] + if !ok { + provider, err := w.router.remoteProvider.InitRemote(remoteInfo) + if err != nil { + w.router.log.Debug("could not initialize writer provider", + zap.String("error", err.Error()), + ) + + continue // best effort + } + + remoteWriter, err = provider.InitWriter(w.ctx) + if err != nil { + w.router.log.Debug("could not initialize writer", + zap.String("error", err.Error()), + ) + + continue // best effort + } + + w.mServers[endpoint] = remoteWriter + } + + err := remoteWriter.Put(a) + if err != nil { + w.router.log.Debug("could not put the value", + zap.String("error", err.Error()), + ) + } + + // continue best effort + } + + return nil +} + +func (w *loadWriter) Close() error { + for endpoint, wRemote := range w.mServers { + err := wRemote.Close() + if err != nil { + w.router.log.Debug("could not close remote server writer", + zap.String("endpoint", endpoint), + zap.String("error", err.Error()), + ) + } + } + + return nil +} diff --git a/pkg/services/container/announcement/load/route/deps.go b/pkg/services/container/announcement/load/route/deps.go new file mode 100644 index 000000000..e6dba9240 --- /dev/null +++ b/pkg/services/container/announcement/load/route/deps.go @@ -0,0 +1,41 @@ +package loadroute + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/container" + loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" +) + +// ServerInfo describes a set of +// characteristics of a point in a route. +type ServerInfo interface { + // PublicKey returns public key of the node + // from the route in a binary representation. + PublicKey() []byte + + // Returns network address of the node + // in the route. + // + // Can be empty. + Address() string +} + +// Builder groups methods to route values in the network. +type Builder interface { + // NextStage must return next group of route points for the value a + // based on the passed route. + // + // Empty passed list means being at the starting point of the route. + // + // Must return empty list and no error if the endpoint of the route is reached. + NextStage(a container.UsedSpaceAnnouncement, passed []ServerInfo) ([]ServerInfo, error) +} + +// RemoteWriterProvider describes the component +// for sending values to a fixed route point. +type RemoteWriterProvider interface { + // InitRemote must return WriterProvider to the route point + // corresponding to info. + // + // Nil info matches the end of the route. + InitRemote(info ServerInfo) (loadcontroller.WriterProvider, error) +} diff --git a/pkg/services/container/announcement/load/route/opts.go b/pkg/services/container/announcement/load/route/opts.go new file mode 100644 index 000000000..abbc385c7 --- /dev/null +++ b/pkg/services/container/announcement/load/route/opts.go @@ -0,0 +1,28 @@ +package loadroute + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Router. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns Option to specify logging component. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/container/announcement/load/route/router.go b/pkg/services/container/announcement/load/route/router.go new file mode 100644 index 000000000..ba4e9c582 --- /dev/null +++ b/pkg/services/container/announcement/load/route/router.go @@ -0,0 +1,86 @@ +package loadroute + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/util/logger" +) + +// Prm groups the required parameters of the Router's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Characteristics of the local node's server. + // + // Must not be nil. + LocalServerInfo ServerInfo + + // Component for sending values to a fixed route point. + // + // Must not be nil. + RemoteWriterProvider RemoteWriterProvider + + // Route planner. + // + // Must not be nil. + Builder Builder +} + +// Router represents component responsible for routing +// used container space values over the network. +// +// For each fixed pair (container ID, epoch) there is a +// single value route on the network. Router provides the +// interface for writing values to the next point of the route. +// +// For correct operation, Router must be created using +// the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the Router is immediately ready to work through API. +type Router struct { + log *logger.Logger + + remoteProvider RemoteWriterProvider + + routeBuilder Builder + + localSrvInfo ServerInfo +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Router. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Router does not require additional +// initialization and is completely ready for work. +func New(prm Prm, opts ...Option) *Router { + switch { + case prm.RemoteWriterProvider == nil: + panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider) + case prm.Builder == nil: + panicOnPrmValue("Builder", prm.Builder) + case prm.LocalServerInfo == nil: + panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo) + } + + o := defaultOpts() + + for i := range opts { + opts[i](o) + } + + return &Router{ + log: o.log, + remoteProvider: prm.RemoteWriterProvider, + routeBuilder: prm.Builder, + localSrvInfo: prm.LocalServerInfo, + } +} diff --git a/pkg/services/container/announcement/load/route/util.go b/pkg/services/container/announcement/load/route/util.go new file mode 100644 index 000000000..941fe0c26 --- /dev/null +++ b/pkg/services/container/announcement/load/route/util.go @@ -0,0 +1,39 @@ +package loadroute + +import ( + "bytes" + "errors" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" +) + +var errWrongRoute = errors.New("wrong route") + +// CheckRoute checks if the route is a route correctly constructed by the builder for value a. +// +// Returns nil if route is correct, otherwise an error clarifying the inconsistency. +func CheckRoute(builder Builder, a container.UsedSpaceAnnouncement, route []ServerInfo) error { + for i := 1; i < len(route); i++ { + servers, err := builder.NextStage(a, route[:i]) + if err != nil { + return err + } else if len(servers) == 0 { + break + } + + found := false + + for j := range servers { + if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) { + found = true + break + } + } + + if !found { + return errWrongRoute + } + } + + return nil +}