forked from TrueCloudLab/frostfs-node
[#328] container/load: Implement route controller
Implement a component for transmitting the value of the used container space along a route defined in the system. Implement WriterProvider interface on it. By implementation, it is the link between the route planner and the point-to-point transmitter, and abstracts from the implementation of both of them. In the future, this implementation will be used as a transmitter of local estimates of storage nodes among themselves. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
1c6d37e821
commit
d48fb81193
5 changed files with 361 additions and 0 deletions
167
pkg/services/container/announcement/load/route/calls.go
Normal file
167
pkg/services/container/announcement/load/route/calls.go
Normal file
|
@ -0,0 +1,167 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type routeContext struct {
|
||||
context.Context
|
||||
|
||||
passedRoute []ServerInfo
|
||||
}
|
||||
|
||||
// NewRouteContext wraps the main context of value passing with its traversal route.
|
||||
//
|
||||
// Passing the result to Router.InitWriter method will allow you to continue this route.
|
||||
func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context {
|
||||
return &routeContext{
|
||||
Context: ctx,
|
||||
passedRoute: passed,
|
||||
}
|
||||
}
|
||||
|
||||
// InitWriter initializes and returns Writer that sends each value to its next route point.
|
||||
//
|
||||
// If ctx was created by NewRouteContext, then the traversed route is taken into account,
|
||||
// and the value will be sent to its continuation. Otherwise, the route will be laid
|
||||
// from scratch and the value will be sent to its primary point.
|
||||
//
|
||||
// After building a list of remote points of the next leg of the route, the value is sent
|
||||
// sequentially to all of them. If any transmissions (even all) fail, an error will not
|
||||
// be returned.
|
||||
//
|
||||
// Close of the composed Writer calls Close method on each internal Writer generated in
|
||||
// runtime and never returns an error.
|
||||
//
|
||||
// Always returns nil error.
|
||||
func (r *Router) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
|
||||
var (
|
||||
routeCtx *routeContext
|
||||
ok bool
|
||||
)
|
||||
|
||||
if routeCtx, ok = ctx.(*routeContext); !ok {
|
||||
routeCtx = &routeContext{
|
||||
Context: ctx,
|
||||
passedRoute: []ServerInfo{r.localSrvInfo},
|
||||
}
|
||||
}
|
||||
|
||||
return &loadWriter{
|
||||
router: r,
|
||||
ctx: routeCtx,
|
||||
mRoute: make(map[routeKey]*valuesRoute),
|
||||
mServers: make(map[string]loadcontroller.Writer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type routeKey struct {
|
||||
epoch uint64
|
||||
|
||||
cid string
|
||||
}
|
||||
|
||||
type valuesRoute struct {
|
||||
route []ServerInfo
|
||||
|
||||
values []container.UsedSpaceAnnouncement
|
||||
}
|
||||
|
||||
type loadWriter struct {
|
||||
router *Router
|
||||
|
||||
ctx *routeContext
|
||||
|
||||
routeMtx sync.RWMutex
|
||||
mRoute map[routeKey]*valuesRoute
|
||||
|
||||
mServers map[string]loadcontroller.Writer
|
||||
}
|
||||
|
||||
func (w *loadWriter) Put(a container.UsedSpaceAnnouncement) error {
|
||||
w.routeMtx.Lock()
|
||||
defer w.routeMtx.Unlock()
|
||||
|
||||
key := routeKey{
|
||||
epoch: a.Epoch(),
|
||||
cid: a.ContainerID().String(),
|
||||
}
|
||||
|
||||
routeValues, ok := w.mRoute[key]
|
||||
if !ok {
|
||||
route, err := w.router.routeBuilder.NextStage(a, w.ctx.passedRoute)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(route) == 0 {
|
||||
route = []ServerInfo{nil}
|
||||
}
|
||||
|
||||
routeValues = &valuesRoute{
|
||||
route: route,
|
||||
values: []container.UsedSpaceAnnouncement{a},
|
||||
}
|
||||
|
||||
w.mRoute[key] = routeValues
|
||||
}
|
||||
|
||||
for _, remoteInfo := range routeValues.route {
|
||||
var endpoint string
|
||||
|
||||
if remoteInfo != nil {
|
||||
endpoint = remoteInfo.Address()
|
||||
}
|
||||
|
||||
remoteWriter, ok := w.mServers[endpoint]
|
||||
if !ok {
|
||||
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not initialize writer provider",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue // best effort
|
||||
}
|
||||
|
||||
remoteWriter, err = provider.InitWriter(w.ctx)
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not initialize writer",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue // best effort
|
||||
}
|
||||
|
||||
w.mServers[endpoint] = remoteWriter
|
||||
}
|
||||
|
||||
err := remoteWriter.Put(a)
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not put the value",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
// continue best effort
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *loadWriter) Close() error {
|
||||
for endpoint, wRemote := range w.mServers {
|
||||
err := wRemote.Close()
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not close remote server writer",
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
41
pkg/services/container/announcement/load/route/deps.go
Normal file
41
pkg/services/container/announcement/load/route/deps.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
||||
)
|
||||
|
||||
// ServerInfo describes a set of
|
||||
// characteristics of a point in a route.
|
||||
type ServerInfo interface {
|
||||
// PublicKey returns public key of the node
|
||||
// from the route in a binary representation.
|
||||
PublicKey() []byte
|
||||
|
||||
// Returns network address of the node
|
||||
// in the route.
|
||||
//
|
||||
// Can be empty.
|
||||
Address() string
|
||||
}
|
||||
|
||||
// Builder groups methods to route values in the network.
|
||||
type Builder interface {
|
||||
// NextStage must return next group of route points for the value a
|
||||
// based on the passed route.
|
||||
//
|
||||
// Empty passed list means being at the starting point of the route.
|
||||
//
|
||||
// Must return empty list and no error if the endpoint of the route is reached.
|
||||
NextStage(a container.UsedSpaceAnnouncement, passed []ServerInfo) ([]ServerInfo, error)
|
||||
}
|
||||
|
||||
// RemoteWriterProvider describes the component
|
||||
// for sending values to a fixed route point.
|
||||
type RemoteWriterProvider interface {
|
||||
// InitRemote must return WriterProvider to the route point
|
||||
// corresponding to info.
|
||||
//
|
||||
// Nil info matches the end of the route.
|
||||
InitRemote(info ServerInfo) (loadcontroller.WriterProvider, error)
|
||||
}
|
28
pkg/services/container/announcement/load/route/opts.go
Normal file
28
pkg/services/container/announcement/load/route/opts.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option sets an optional parameter of Router.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOpts() *options {
|
||||
return &options{
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns Option to specify logging component.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
if l != nil {
|
||||
o.log = l
|
||||
}
|
||||
}
|
||||
}
|
86
pkg/services/container/announcement/load/route/router.go
Normal file
86
pkg/services/container/announcement/load/route/router.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
// Prm groups the required parameters of the Router's constructor.
|
||||
//
|
||||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {
|
||||
// Characteristics of the local node's server.
|
||||
//
|
||||
// Must not be nil.
|
||||
LocalServerInfo ServerInfo
|
||||
|
||||
// Component for sending values to a fixed route point.
|
||||
//
|
||||
// Must not be nil.
|
||||
RemoteWriterProvider RemoteWriterProvider
|
||||
|
||||
// Route planner.
|
||||
//
|
||||
// Must not be nil.
|
||||
Builder Builder
|
||||
}
|
||||
|
||||
// Router represents component responsible for routing
|
||||
// used container space values over the network.
|
||||
//
|
||||
// For each fixed pair (container ID, epoch) there is a
|
||||
// single value route on the network. Router provides the
|
||||
// interface for writing values to the next point of the route.
|
||||
//
|
||||
// For correct operation, Router must be created using
|
||||
// the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// the Router is immediately ready to work through API.
|
||||
type Router struct {
|
||||
log *logger.Logger
|
||||
|
||||
remoteProvider RemoteWriterProvider
|
||||
|
||||
routeBuilder Builder
|
||||
|
||||
localSrvInfo ServerInfo
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func panicOnPrmValue(n string, v interface{}) {
|
||||
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||
}
|
||||
|
||||
// New creates a new instance of the Router.
|
||||
//
|
||||
// Panics if at least one value of the parameters is invalid.
|
||||
//
|
||||
// The created Router does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(prm Prm, opts ...Option) *Router {
|
||||
switch {
|
||||
case prm.RemoteWriterProvider == nil:
|
||||
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
|
||||
case prm.Builder == nil:
|
||||
panicOnPrmValue("Builder", prm.Builder)
|
||||
case prm.LocalServerInfo == nil:
|
||||
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
|
||||
}
|
||||
|
||||
o := defaultOpts()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](o)
|
||||
}
|
||||
|
||||
return &Router{
|
||||
log: o.log,
|
||||
remoteProvider: prm.RemoteWriterProvider,
|
||||
routeBuilder: prm.Builder,
|
||||
localSrvInfo: prm.LocalServerInfo,
|
||||
}
|
||||
}
|
39
pkg/services/container/announcement/load/route/util.go
Normal file
39
pkg/services/container/announcement/load/route/util.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
)
|
||||
|
||||
var errWrongRoute = errors.New("wrong route")
|
||||
|
||||
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
||||
//
|
||||
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
||||
func CheckRoute(builder Builder, a container.UsedSpaceAnnouncement, route []ServerInfo) error {
|
||||
for i := 1; i < len(route); i++ {
|
||||
servers, err := builder.NextStage(a, route[:i])
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(servers) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
found := false
|
||||
|
||||
for j := range servers {
|
||||
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return errWrongRoute
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue