Refactor container service #217

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:refactoring/object-3061-container into master 2023-04-05 14:38:01 +00:00
11 changed files with 97 additions and 129 deletions

View file

@ -39,7 +39,7 @@ const (
stopEstimationNotifyEvent = "StopEstimation" stopEstimationNotifyEvent = "StopEstimation"
) )
func initContainerService(c *cfg) { func initContainerService(ctx context.Context, c *cfg) {
// container wrapper that tries to invoke notary // container wrapper that tries to invoke notary
// requests if chain is configured so // requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
@ -77,7 +77,7 @@ func initContainerService(c *cfg) {
loadroute.WithLogger(c.log), loadroute.WithLogger(c.log),
) )
setLoadController(c, loadRouter, loadAccumulator) setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New( server := containerTransportGRPC.New(
containerService.NewSignService( containerService.NewSignService(
@ -180,7 +180,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
return cnrRdr, cnrWrt return cnrRdr, cnrWrt
} }
func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes() pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary // container wrapper that always sends non-notary
@ -211,14 +211,14 @@ func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *lo
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation) setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) { addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(loadcontroller.StartPrm{ ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(), Epoch: ev.(containerEvent.StartEstimation).Epoch(),
}) })
}) })
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation) setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) { addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(loadcontroller.StopPrm{ ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(), Epoch: ev.(containerEvent.StopEstimation).Epoch(),
}) })
}) })
@ -335,7 +335,7 @@ type remoteLoadAnnounceProvider struct {
deadEndProvider loadcontroller.WriterProvider deadEndProvider loadcontroller.WriterProvider
} }
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) { func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil { if srv == nil {
return r.deadEndProvider, nil return r.deadEndProvider, nil
} }
@ -366,7 +366,7 @@ type remoteLoadAnnounceWriterProvider struct {
client client.Client client client.Client
} }
func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{ return &remoteLoadAnnounceWriter{
client: p.client, client: p.client,
}, nil }, nil
@ -536,7 +536,7 @@ func (c *usedSpaceService) ExternalAddresses() []string {
} }
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) { func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadroute.ServerInfo var passedRoute []loadcontroller.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{ passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
@ -550,7 +550,7 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
passedRoute = append(passedRoute, c) passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute)) w, err := c.loadWriterProvider.InitWriter(passedRoute)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err) return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
} }
@ -615,7 +615,7 @@ func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID,
} }
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation, func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
route []loadroute.ServerInfo, w loadcontroller.Writer) error { route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey()) fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
if err != nil { if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err) return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)

View file

@ -97,7 +97,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "gRPC", initGRPC) initAndLog(c, "gRPC", initGRPC)
initAndLog(c, "netmap", initNetmapService) initAndLog(c, "netmap", initNetmapService)
initAndLog(c, "accounting", initAccountingService) initAndLog(c, "accounting", initAccountingService)
initAndLog(c, "container", initContainerService) initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService) initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", initReputationService) initAndLog(c, "reputation", initReputationService)
initAndLog(c, "notification", initNotifications) initAndLog(c, "notification", initNotifications)

View file

@ -15,18 +15,15 @@ type StartPrm struct {
Epoch uint64 Epoch uint64
} }
// nolint: containedctx
type commonContext struct { type commonContext struct {
epoch uint64 epoch uint64
ctrl *Controller ctrl *Controller
log *logger.Logger log *logger.Logger
ctx context.Context
} }
type announceContext struct { type announcer struct {
commonContext commonContext
} }
@ -39,21 +36,22 @@ type announceContext struct {
// //
// Each call acquires an announcement context for an Epoch parameter. // Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released. // At the very end of the operation, the context is released.
func (c *Controller) Start(prm StartPrm) { func (c *Controller) Start(ctx context.Context, prm StartPrm) {
var announcer *announcer
// acquire announcement // acquire announcement
execCtx := c.acquireAnnouncement(prm) ctx, announcer = c.acquireAnnouncement(ctx, prm)
if execCtx == nil { if announcer == nil {
return return
} }
// finally stop and free the announcement // finally stop and free the announcement
defer execCtx.freeAnnouncement() defer announcer.freeAnnouncement()
// announce local values // announce local values
execCtx.announce() announcer.announce(ctx)
} }
func (c *announceContext) announce() { func (c *announcer) announce(ctx context.Context) {
c.log.Debug("starting to announce the values of the metrics") c.log.Debug("starting to announce the values of the metrics")
var ( var (
@ -62,7 +60,7 @@ func (c *announceContext) announce() {
) )
// initialize iterator over locally collected metrics // initialize iterator over locally collected metrics
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx) metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
if err != nil { if err != nil {
c.log.Debug("could not initialize iterator over locally collected metrics", c.log.Debug("could not initialize iterator over locally collected metrics",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -72,7 +70,7 @@ func (c *announceContext) announce() {
} }
// initialize target of local announcements // initialize target of local announcements
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx) targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
if err != nil { if err != nil {
c.log.Debug("could not initialize announcement accumulator", c.log.Debug("could not initialize announcement accumulator",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -100,7 +98,7 @@ func (c *announceContext) announce() {
} }
// finish writing // finish writing
err = targetWriter.Close(c.ctx) err = targetWriter.Close(ctx)
if err != nil { if err != nil {
c.log.Debug("could not finish writing local announcements", c.log.Debug("could not finish writing local announcements",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -112,35 +110,32 @@ func (c *announceContext) announce() {
c.log.Debug("trust announcement successfully finished") c.log.Debug("trust announcement successfully finished")
} }
func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext { func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
var ctx context.Context started := true
c.announceMtx.Lock() c.announceMtx.Lock()
{ {
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(ctx)
c.mAnnounceCtx[prm.Epoch] = cancel c.mAnnounceCtx[prm.Epoch] = cancel
started = false
} }
} }
c.announceMtx.Unlock() c.announceMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With( log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch), zap.Uint64("epoch", prm.Epoch),
)} )}
if ctx == nil { if started {
log.Debug("announcement is already started") log.Debug("announcement is already started")
return nil return ctx, nil
} }
return &announceContext{ return ctx, &announcer{
commonContext: commonContext{ commonContext: commonContext{
epoch: prm.Epoch, epoch: prm.Epoch,
ctrl: c, ctrl: c,
log: log, log: log,
ctx: ctx,
}, },
} }
} }
@ -176,7 +171,7 @@ type StopPrm struct {
Epoch uint64 Epoch uint64
} }
type stopContext struct { type reporter struct {
commonContext commonContext
} }
@ -188,31 +183,32 @@ type stopContext struct {
// //
// Each call acquires a report context for an Epoch parameter. // Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released. // At the very end of the operation, the context is released.
func (c *Controller) Stop(prm StopPrm) { func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
execCtx := c.acquireReport(prm) var reporter *reporter
if execCtx == nil { ctx, reporter = c.acquireReport(ctx, prm)
if reporter == nil {
return return
} }
// finally stop and free reporting // finally stop and free reporting
defer execCtx.freeReport() defer reporter.freeReport()
// interrupt announcement // interrupt announcement
execCtx.freeAnnouncement() reporter.freeAnnouncement()
// report the estimations // report the estimations
execCtx.report() reporter.report(ctx)
} }
func (c *Controller) acquireReport(prm StopPrm) *stopContext { func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
var ctx context.Context started := true
c.reportMtx.Lock() c.reportMtx.Lock()
{ {
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(ctx)
c.mReportCtx[prm.Epoch] = cancel c.mReportCtx[prm.Epoch] = cancel
started = false
} }
} }
@ -222,12 +218,12 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext {
zap.Uint64("epoch", prm.Epoch), zap.Uint64("epoch", prm.Epoch),
)} )}
if ctx == nil { if started {
log.Debug("report is already started") log.Debug("report is already started")
return nil return ctx, nil
} }
return &stopContext{ return ctx, &reporter{
commonContext: commonContext{ commonContext: commonContext{
epoch: prm.Epoch, epoch: prm.Epoch,
ctrl: c, ctrl: c,
@ -261,14 +257,14 @@ func (c *commonContext) freeReport() {
} }
} }
func (c *stopContext) report() { func (c *reporter) report(ctx context.Context) {
var ( var (
localIterator Iterator localIterator Iterator
err error err error
) )
// initialize iterator over locally accumulated announcements // initialize iterator over locally accumulated announcements
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx) localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
if err != nil { if err != nil {
c.log.Debug("could not initialize iterator over locally accumulated announcements", c.log.Debug("could not initialize iterator over locally accumulated announcements",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -278,7 +274,7 @@ func (c *stopContext) report() {
} }
// initialize final destination of load estimations // initialize final destination of load estimations
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx) resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
if err != nil { if err != nil {
c.log.Debug("could not initialize result target", c.log.Debug("could not initialize result target",
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -301,7 +297,7 @@ func (c *stopContext) report() {
} }
// finish writing // finish writing
err = resultWriter.Close(c.ctx) err = resultWriter.Close(ctx)
if err != nil { if err != nil {
c.log.Debug("could not finish writing load estimations", c.log.Debug("could not finish writing load estimations",
zap.String("error", err.Error()), zap.String("error", err.Error()),

View file

@ -28,7 +28,7 @@ func newTestStorage() *testAnnouncementStorage {
} }
} }
func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) { func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
if s.i != nil { if s.i != nil {
return s.i, nil return s.i, nil
} }
@ -53,7 +53,7 @@ func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h lo
return nil return nil
} }
func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) { func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if s.w != nil { if s.w != nil {
return s.w, nil return s.w, nil
} }
@ -143,12 +143,12 @@ func TestSimpleScenario(t *testing.T) {
// start both controllers // start both controllers
go func() { go func() {
ctrlN1.Start(startPrm) ctrlN1.Start(context.Background(), startPrm)
wg.Done() wg.Done()
}() }()
go func() { go func() {
ctrlN2.Start(startPrm) ctrlN2.Start(context.Background(), startPrm)
wg.Done() wg.Done()
}() }()
@ -161,12 +161,12 @@ func TestSimpleScenario(t *testing.T) {
// stop both controllers // stop both controllers
go func() { go func() {
ctrlN1.Stop(stopPrm) ctrlN1.Stop(context.Background(), stopPrm)
wg.Done() wg.Done()
}() }()
go func() { go func() {
ctrlN2.Stop(stopPrm) ctrlN2.Stop(context.Background(), stopPrm)
wg.Done() wg.Done()
}() }()

View file

@ -45,7 +45,7 @@ type IteratorProvider interface {
// //
// Implementations can have different logic for different // Implementations can have different logic for different
// contexts, so specific ones may document their own behavior. // contexts, so specific ones may document their own behavior.
InitIterator(context.Context) (Iterator, error) InitIterator() (Iterator, error)
} }
// Writer describes the interface for storing container.SizeEstimation values. // Writer describes the interface for storing container.SizeEstimation values.
@ -80,8 +80,24 @@ type WriterProvider interface {
// //
// Initialization problems are reported via error. // Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil. // If no error was returned, then the Writer must not be nil.
// InitWriter(route []ServerInfo) (Writer, error)
// Implementations can have different logic for different }
// contexts, so specific ones may document their own behavior.
InitWriter(context.Context) (Writer, error) // ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external node's addresses.
ExternalAddresses() []string
} }

View file

@ -1,8 +1,6 @@
package loadcontroller package loadcontroller
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
) )
@ -17,11 +15,11 @@ type storageWrapper struct {
i Iterator i Iterator
} }
func (s storageWrapper) InitIterator(context.Context) (Iterator, error) { func (s storageWrapper) InitIterator() (Iterator, error) {
return s.i, nil return s.i, nil
} }
func (s storageWrapper) InitWriter(context.Context) (Writer, error) { func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
return s.w, nil return s.w, nil
} }

View file

@ -10,26 +10,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// nolint: containedctx
type routeContext struct {
context.Context
passedRoute []ServerInfo
}
// NewRouteContext wraps the main context of value passing with its traversal route.
//
// Passing the result to Router.InitWriter method will allow you to continue this route.
func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context {
return &routeContext{
Context: ctx,
passedRoute: passed,
}
}
// InitWriter initializes and returns Writer that sends each value to its next route point. // InitWriter initializes and returns Writer that sends each value to its next route point.
// //
// If ctx was created by NewRouteContext, then the traversed route is taken into account, // If route is present, then it is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid // and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point. // from scratch and the value will be sent to its primary point.
// //
@ -41,22 +24,14 @@ func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context {
// runtime and never returns an error. // runtime and never returns an error.
// //
// Always returns nil error. // Always returns nil error.
func (r *Router) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
var ( if len(route) == 0 {
routeCtx *routeContext route = []loadcontroller.ServerInfo{r.localSrvInfo}
ok bool
)
if routeCtx, ok = ctx.(*routeContext); !ok {
routeCtx = &routeContext{
Context: ctx,
passedRoute: []ServerInfo{r.localSrvInfo},
}
} }
return &loadWriter{ return &loadWriter{
router: r, router: r,
ctx: routeCtx, route: route,
mRoute: make(map[routeKey]*valuesRoute), mRoute: make(map[routeKey]*valuesRoute),
mServers: make(map[string]loadcontroller.Writer), mServers: make(map[string]loadcontroller.Writer),
}, nil }, nil
@ -69,7 +44,7 @@ type routeKey struct {
} }
type valuesRoute struct { type valuesRoute struct {
route []ServerInfo route []loadcontroller.ServerInfo
values []container.SizeEstimation values []container.SizeEstimation
} }
@ -77,7 +52,7 @@ type valuesRoute struct {
type loadWriter struct { type loadWriter struct {
router *Router router *Router
ctx *routeContext route []loadcontroller.ServerInfo
routeMtx sync.RWMutex routeMtx sync.RWMutex
mRoute map[routeKey]*valuesRoute mRoute map[routeKey]*valuesRoute
@ -96,11 +71,11 @@ func (w *loadWriter) Put(a container.SizeEstimation) error {
routeValues, ok := w.mRoute[key] routeValues, ok := w.mRoute[key]
if !ok { if !ok {
route, err := w.router.routeBuilder.NextStage(a, w.ctx.passedRoute) route, err := w.router.routeBuilder.NextStage(a, w.route)
if err != nil { if err != nil {
return err return err
} else if len(route) == 0 { } else if len(route) == 0 {
route = []ServerInfo{nil} route = []loadcontroller.ServerInfo{nil}
} }
routeValues = &valuesRoute{ routeValues = &valuesRoute{
@ -129,7 +104,7 @@ func (w *loadWriter) Put(a container.SizeEstimation) error {
continue // best effort continue // best effort
} }
remoteWriter, err = provider.InitWriter(w.ctx) remoteWriter, err = provider.InitWriter(w.route)
if err != nil { if err != nil {
w.router.log.Debug("could not initialize writer", w.router.log.Debug("could not initialize writer",
zap.String("error", err.Error()), zap.String("error", err.Error()),

View file

@ -5,25 +5,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
) )
// ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external node's addresses.
ExternalAddresses() []string
}
// Builder groups methods to route values in the network. // Builder groups methods to route values in the network.
type Builder interface { type Builder interface {
// NextStage must return next group of route points for the value a // NextStage must return next group of route points for the value a
@ -36,7 +17,7 @@ type Builder interface {
// in that list (means that point is the last point in one of the route groups), // in that list (means that point is the last point in one of the route groups),
// returned route must contain nil point that should be interpreted as signal to, // returned route must contain nil point that should be interpreted as signal to,
// among sending to other route points, save the announcement in that point. // among sending to other route points, save the announcement in that point.
NextStage(a container.SizeEstimation, passed []ServerInfo) ([]ServerInfo, error) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
} }
// RemoteWriterProvider describes the component // RemoteWriterProvider describes the component
@ -46,5 +27,5 @@ type RemoteWriterProvider interface {
// corresponding to info. // corresponding to info.
// //
// Nil info matches the end of the route. // Nil info matches the end of the route.
InitRemote(info ServerInfo) (loadcontroller.WriterProvider, error) InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
} }

View file

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route" loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
) )
@ -15,7 +15,7 @@ import (
// If passed route has more than one point, then endpoint of the route is reached. // If passed route has more than one point, then endpoint of the route is reached.
// //
// The traversed route is not checked, it is assumed to be correct. // The traversed route is not checked, it is assumed to be correct.
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.ServerInfo) ([]loadroute.ServerInfo, error) { func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
if len(passed) > 1 { if len(passed) > 1 {
return nil, nil return nil, nil
} }
@ -27,7 +27,7 @@ func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.Serve
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err) return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
} }
res := make([]loadroute.ServerInfo, 0, len(placement)) res := make([]loadcontroller.ServerInfo, 0, len(placement))
for i := range placement { for i := range placement {
if len(placement[i]) == 0 { if len(placement[i]) == 0 {

View file

@ -3,6 +3,7 @@ package loadroute
import ( import (
"fmt" "fmt"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
) )
@ -15,7 +16,7 @@ type Prm struct {
// Characteristics of the local node's server. // Characteristics of the local node's server.
// //
// Must not be nil. // Must not be nil.
LocalServerInfo ServerInfo LocalServerInfo loadcontroller.ServerInfo
// Component for sending values to a fixed route point. // Component for sending values to a fixed route point.
// //
@ -46,7 +47,7 @@ type Router struct {
routeBuilder Builder routeBuilder Builder
localSrvInfo ServerInfo localSrvInfo loadcontroller.ServerInfo
} }
const invalidPrmValFmt = "invalid parameter %s (%T):%v" const invalidPrmValFmt = "invalid parameter %s (%T):%v"

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
) )
@ -12,7 +13,7 @@ var errWrongRoute = errors.New("wrong route")
// CheckRoute checks if the route is a route correctly constructed by the builder for value a. // CheckRoute checks if the route is a route correctly constructed by the builder for value a.
// //
// Returns nil if route is correct, otherwise an error clarifying the inconsistency. // Returns nil if route is correct, otherwise an error clarifying the inconsistency.
func CheckRoute(builder Builder, a container.SizeEstimation, route []ServerInfo) error { func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
for i := 1; i < len(route); i++ { for i := 1; i < len(route); i++ {
servers, err := builder.NextStage(a, route[:i]) servers, err := builder.NextStage(a, route[:i])
if err != nil { if err != nil {