Refactor container service #217
11 changed files with 97 additions and 129 deletions
|
@ -39,7 +39,7 @@ const (
|
||||||
stopEstimationNotifyEvent = "StopEstimation"
|
stopEstimationNotifyEvent = "StopEstimation"
|
||||||
)
|
)
|
||||||
|
|
||||||
func initContainerService(c *cfg) {
|
func initContainerService(ctx context.Context, c *cfg) {
|
||||||
// container wrapper that tries to invoke notary
|
// container wrapper that tries to invoke notary
|
||||||
// requests if chain is configured so
|
// requests if chain is configured so
|
||||||
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
||||||
|
@ -77,7 +77,7 @@ func initContainerService(c *cfg) {
|
||||||
loadroute.WithLogger(c.log),
|
loadroute.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
|
|
||||||
setLoadController(c, loadRouter, loadAccumulator)
|
setLoadController(ctx, c, loadRouter, loadAccumulator)
|
||||||
|
|
||||||
server := containerTransportGRPC.New(
|
server := containerTransportGRPC.New(
|
||||||
containerService.NewSignService(
|
containerService.NewSignService(
|
||||||
|
@ -180,7 +180,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
return cnrRdr, cnrWrt
|
return cnrRdr, cnrWrt
|
||||||
}
|
}
|
||||||
|
|
||||||
func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
|
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
|
||||||
pubKey := c.key.PublicKey().Bytes()
|
pubKey := c.key.PublicKey().Bytes()
|
||||||
|
|
||||||
// container wrapper that always sends non-notary
|
// container wrapper that always sends non-notary
|
||||||
|
@ -211,14 +211,14 @@ func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *lo
|
||||||
|
|
||||||
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
|
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
|
||||||
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
|
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
|
||||||
ctrl.Start(loadcontroller.StartPrm{
|
ctrl.Start(ctx, loadcontroller.StartPrm{
|
||||||
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
|
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
|
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
|
||||||
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
|
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
|
||||||
ctrl.Stop(loadcontroller.StopPrm{
|
ctrl.Stop(ctx, loadcontroller.StopPrm{
|
||||||
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -335,7 +335,7 @@ type remoteLoadAnnounceProvider struct {
|
||||||
deadEndProvider loadcontroller.WriterProvider
|
deadEndProvider loadcontroller.WriterProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) {
|
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
|
||||||
if srv == nil {
|
if srv == nil {
|
||||||
return r.deadEndProvider, nil
|
return r.deadEndProvider, nil
|
||||||
}
|
}
|
||||||
|
@ -366,7 +366,7 @@ type remoteLoadAnnounceWriterProvider struct {
|
||||||
client client.Client
|
client client.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
|
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||||
return &remoteLoadAnnounceWriter{
|
return &remoteLoadAnnounceWriter{
|
||||||
client: p.client,
|
client: p.client,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -536,7 +536,7 @@ func (c *usedSpaceService) ExternalAddresses() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
||||||
var passedRoute []loadroute.ServerInfo
|
var passedRoute []loadcontroller.ServerInfo
|
||||||
|
|
||||||
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
||||||
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
||||||
|
@ -550,7 +550,7 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container
|
||||||
|
|
||||||
passedRoute = append(passedRoute, c)
|
passedRoute = append(passedRoute, c)
|
||||||
|
|
||||||
w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute))
|
w, err := c.loadWriterProvider.InitWriter(passedRoute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
|
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -615,7 +615,7 @@ func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
|
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
|
||||||
route []loadroute.ServerInfo, w loadcontroller.Writer) error {
|
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
|
||||||
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
|
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
|
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
|
||||||
|
|
|
@ -97,7 +97,7 @@ func initApp(ctx context.Context, c *cfg) {
|
||||||
initAndLog(c, "gRPC", initGRPC)
|
initAndLog(c, "gRPC", initGRPC)
|
||||||
initAndLog(c, "netmap", initNetmapService)
|
initAndLog(c, "netmap", initNetmapService)
|
||||||
initAndLog(c, "accounting", initAccountingService)
|
initAndLog(c, "accounting", initAccountingService)
|
||||||
initAndLog(c, "container", initContainerService)
|
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
|
||||||
initAndLog(c, "session", initSessionService)
|
initAndLog(c, "session", initSessionService)
|
||||||
initAndLog(c, "reputation", initReputationService)
|
initAndLog(c, "reputation", initReputationService)
|
||||||
initAndLog(c, "notification", initNotifications)
|
initAndLog(c, "notification", initNotifications)
|
||||||
|
|
|
@ -15,18 +15,15 @@ type StartPrm struct {
|
||||||
Epoch uint64
|
Epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: containedctx
|
|
||||||
type commonContext struct {
|
type commonContext struct {
|
||||||
epoch uint64
|
epoch uint64
|
||||||
|
|
||||||
ctrl *Controller
|
ctrl *Controller
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
ctx context.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type announceContext struct {
|
type announcer struct {
|
||||||
commonContext
|
commonContext
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,21 +36,22 @@ type announceContext struct {
|
||||||
//
|
//
|
||||||
// Each call acquires an announcement context for an Epoch parameter.
|
// Each call acquires an announcement context for an Epoch parameter.
|
||||||
// At the very end of the operation, the context is released.
|
// At the very end of the operation, the context is released.
|
||||||
func (c *Controller) Start(prm StartPrm) {
|
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
|
||||||
|
var announcer *announcer
|
||||||
// acquire announcement
|
// acquire announcement
|
||||||
execCtx := c.acquireAnnouncement(prm)
|
ctx, announcer = c.acquireAnnouncement(ctx, prm)
|
||||||
if execCtx == nil {
|
if announcer == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// finally stop and free the announcement
|
// finally stop and free the announcement
|
||||||
defer execCtx.freeAnnouncement()
|
defer announcer.freeAnnouncement()
|
||||||
|
|
||||||
// announce local values
|
// announce local values
|
||||||
execCtx.announce()
|
announcer.announce(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *announceContext) announce() {
|
func (c *announcer) announce(ctx context.Context) {
|
||||||
c.log.Debug("starting to announce the values of the metrics")
|
c.log.Debug("starting to announce the values of the metrics")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -62,7 +60,7 @@ func (c *announceContext) announce() {
|
||||||
)
|
)
|
||||||
|
|
||||||
// initialize iterator over locally collected metrics
|
// initialize iterator over locally collected metrics
|
||||||
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx)
|
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not initialize iterator over locally collected metrics",
|
c.log.Debug("could not initialize iterator over locally collected metrics",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -72,7 +70,7 @@ func (c *announceContext) announce() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize target of local announcements
|
// initialize target of local announcements
|
||||||
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx)
|
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not initialize announcement accumulator",
|
c.log.Debug("could not initialize announcement accumulator",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -100,7 +98,7 @@ func (c *announceContext) announce() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// finish writing
|
// finish writing
|
||||||
err = targetWriter.Close(c.ctx)
|
err = targetWriter.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not finish writing local announcements",
|
c.log.Debug("could not finish writing local announcements",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -112,35 +110,32 @@ func (c *announceContext) announce() {
|
||||||
c.log.Debug("trust announcement successfully finished")
|
c.log.Debug("trust announcement successfully finished")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext {
|
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
|
||||||
var ctx context.Context
|
started := true
|
||||||
|
|
||||||
c.announceMtx.Lock()
|
c.announceMtx.Lock()
|
||||||
|
|
||||||
{
|
{
|
||||||
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
|
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
c.mAnnounceCtx[prm.Epoch] = cancel
|
c.mAnnounceCtx[prm.Epoch] = cancel
|
||||||
|
started = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.announceMtx.Unlock()
|
c.announceMtx.Unlock()
|
||||||
|
|
||||||
log := &logger.Logger{Logger: c.opts.log.With(
|
log := &logger.Logger{Logger: c.opts.log.With(
|
||||||
zap.Uint64("epoch", prm.Epoch),
|
zap.Uint64("epoch", prm.Epoch),
|
||||||
)}
|
)}
|
||||||
|
|
||||||
if ctx == nil {
|
if started {
|
||||||
log.Debug("announcement is already started")
|
log.Debug("announcement is already started")
|
||||||
return nil
|
return ctx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &announceContext{
|
return ctx, &announcer{
|
||||||
commonContext: commonContext{
|
commonContext: commonContext{
|
||||||
epoch: prm.Epoch,
|
epoch: prm.Epoch,
|
||||||
ctrl: c,
|
ctrl: c,
|
||||||
log: log,
|
log: log,
|
||||||
ctx: ctx,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +171,7 @@ type StopPrm struct {
|
||||||
Epoch uint64
|
Epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type stopContext struct {
|
type reporter struct {
|
||||||
commonContext
|
commonContext
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,31 +183,32 @@ type stopContext struct {
|
||||||
//
|
//
|
||||||
// Each call acquires a report context for an Epoch parameter.
|
// Each call acquires a report context for an Epoch parameter.
|
||||||
// At the very end of the operation, the context is released.
|
// At the very end of the operation, the context is released.
|
||||||
func (c *Controller) Stop(prm StopPrm) {
|
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
|
||||||
execCtx := c.acquireReport(prm)
|
var reporter *reporter
|
||||||
if execCtx == nil {
|
ctx, reporter = c.acquireReport(ctx, prm)
|
||||||
|
if reporter == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// finally stop and free reporting
|
// finally stop and free reporting
|
||||||
defer execCtx.freeReport()
|
defer reporter.freeReport()
|
||||||
|
|
||||||
// interrupt announcement
|
// interrupt announcement
|
||||||
execCtx.freeAnnouncement()
|
reporter.freeAnnouncement()
|
||||||
|
|
||||||
// report the estimations
|
// report the estimations
|
||||||
execCtx.report()
|
reporter.report(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) acquireReport(prm StopPrm) *stopContext {
|
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
|
||||||
var ctx context.Context
|
started := true
|
||||||
|
|
||||||
c.reportMtx.Lock()
|
c.reportMtx.Lock()
|
||||||
|
|
||||||
{
|
{
|
||||||
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
|
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
c.mReportCtx[prm.Epoch] = cancel
|
c.mReportCtx[prm.Epoch] = cancel
|
||||||
|
started = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,12 +218,12 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext {
|
||||||
zap.Uint64("epoch", prm.Epoch),
|
zap.Uint64("epoch", prm.Epoch),
|
||||||
)}
|
)}
|
||||||
|
|
||||||
if ctx == nil {
|
if started {
|
||||||
log.Debug("report is already started")
|
log.Debug("report is already started")
|
||||||
return nil
|
return ctx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &stopContext{
|
return ctx, &reporter{
|
||||||
commonContext: commonContext{
|
commonContext: commonContext{
|
||||||
epoch: prm.Epoch,
|
epoch: prm.Epoch,
|
||||||
ctrl: c,
|
ctrl: c,
|
||||||
|
@ -261,14 +257,14 @@ func (c *commonContext) freeReport() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *stopContext) report() {
|
func (c *reporter) report(ctx context.Context) {
|
||||||
var (
|
var (
|
||||||
localIterator Iterator
|
localIterator Iterator
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
// initialize iterator over locally accumulated announcements
|
// initialize iterator over locally accumulated announcements
|
||||||
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx)
|
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not initialize iterator over locally accumulated announcements",
|
c.log.Debug("could not initialize iterator over locally accumulated announcements",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -278,7 +274,7 @@ func (c *stopContext) report() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize final destination of load estimations
|
// initialize final destination of load estimations
|
||||||
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx)
|
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not initialize result target",
|
c.log.Debug("could not initialize result target",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -301,7 +297,7 @@ func (c *stopContext) report() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// finish writing
|
// finish writing
|
||||||
err = resultWriter.Close(c.ctx)
|
err = resultWriter.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Debug("could not finish writing load estimations",
|
c.log.Debug("could not finish writing load estimations",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
|
|
@ -28,7 +28,7 @@ func newTestStorage() *testAnnouncementStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) {
|
func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
|
||||||
if s.i != nil {
|
if s.i != nil {
|
||||||
return s.i, nil
|
return s.i, nil
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h lo
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) {
|
func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||||
if s.w != nil {
|
if s.w != nil {
|
||||||
return s.w, nil
|
return s.w, nil
|
||||||
}
|
}
|
||||||
|
@ -143,12 +143,12 @@ func TestSimpleScenario(t *testing.T) {
|
||||||
|
|
||||||
// start both controllers
|
// start both controllers
|
||||||
go func() {
|
go func() {
|
||||||
ctrlN1.Start(startPrm)
|
ctrlN1.Start(context.Background(), startPrm)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctrlN2.Start(startPrm)
|
ctrlN2.Start(context.Background(), startPrm)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -161,12 +161,12 @@ func TestSimpleScenario(t *testing.T) {
|
||||||
|
|
||||||
// stop both controllers
|
// stop both controllers
|
||||||
go func() {
|
go func() {
|
||||||
ctrlN1.Stop(stopPrm)
|
ctrlN1.Stop(context.Background(), stopPrm)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctrlN2.Stop(stopPrm)
|
ctrlN2.Stop(context.Background(), stopPrm)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ type IteratorProvider interface {
|
||||||
//
|
//
|
||||||
// Implementations can have different logic for different
|
// Implementations can have different logic for different
|
||||||
// contexts, so specific ones may document their own behavior.
|
// contexts, so specific ones may document their own behavior.
|
||||||
InitIterator(context.Context) (Iterator, error)
|
InitIterator() (Iterator, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writer describes the interface for storing container.SizeEstimation values.
|
// Writer describes the interface for storing container.SizeEstimation values.
|
||||||
|
@ -80,8 +80,24 @@ type WriterProvider interface {
|
||||||
//
|
//
|
||||||
// Initialization problems are reported via error.
|
// Initialization problems are reported via error.
|
||||||
// If no error was returned, then the Writer must not be nil.
|
// If no error was returned, then the Writer must not be nil.
|
||||||
//
|
InitWriter(route []ServerInfo) (Writer, error)
|
||||||
// Implementations can have different logic for different
|
}
|
||||||
// contexts, so specific ones may document their own behavior.
|
|
||||||
InitWriter(context.Context) (Writer, error)
|
// ServerInfo describes a set of
|
||||||
|
// characteristics of a point in a route.
|
||||||
|
type ServerInfo interface {
|
||||||
|
// PublicKey returns public key of the node
|
||||||
|
// from the route in a binary representation.
|
||||||
|
PublicKey() []byte
|
||||||
|
|
||||||
|
// Iterates over network addresses of the node
|
||||||
|
// in the route. Breaks iterating on true return
|
||||||
|
// of the handler.
|
||||||
|
IterateAddresses(func(string) bool)
|
||||||
|
|
||||||
|
// Returns number of server's network addresses.
|
||||||
|
NumberOfAddresses() int
|
||||||
|
|
||||||
|
// ExternalAddresses returns external node's addresses.
|
||||||
|
ExternalAddresses() []string
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package loadcontroller
|
package loadcontroller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,11 +15,11 @@ type storageWrapper struct {
|
||||||
i Iterator
|
i Iterator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s storageWrapper) InitIterator(context.Context) (Iterator, error) {
|
func (s storageWrapper) InitIterator() (Iterator, error) {
|
||||||
return s.i, nil
|
return s.i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s storageWrapper) InitWriter(context.Context) (Writer, error) {
|
func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
|
||||||
return s.w, nil
|
return s.w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,26 +10,9 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nolint: containedctx
|
|
||||||
type routeContext struct {
|
|
||||||
context.Context
|
|
||||||
|
|
||||||
passedRoute []ServerInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRouteContext wraps the main context of value passing with its traversal route.
|
|
||||||
//
|
|
||||||
// Passing the result to Router.InitWriter method will allow you to continue this route.
|
|
||||||
func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context {
|
|
||||||
return &routeContext{
|
|
||||||
Context: ctx,
|
|
||||||
passedRoute: passed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitWriter initializes and returns Writer that sends each value to its next route point.
|
// InitWriter initializes and returns Writer that sends each value to its next route point.
|
||||||
//
|
//
|
||||||
// If ctx was created by NewRouteContext, then the traversed route is taken into account,
|
// If route is present, then it is taken into account,
|
||||||
// and the value will be sent to its continuation. Otherwise, the route will be laid
|
// and the value will be sent to its continuation. Otherwise, the route will be laid
|
||||||
// from scratch and the value will be sent to its primary point.
|
// from scratch and the value will be sent to its primary point.
|
||||||
//
|
//
|
||||||
|
@ -41,22 +24,14 @@ func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context {
|
||||||
// runtime and never returns an error.
|
// runtime and never returns an error.
|
||||||
//
|
//
|
||||||
// Always returns nil error.
|
// Always returns nil error.
|
||||||
func (r *Router) InitWriter(ctx context.Context) (loadcontroller.Writer, error) {
|
func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||||
var (
|
if len(route) == 0 {
|
||||||
routeCtx *routeContext
|
route = []loadcontroller.ServerInfo{r.localSrvInfo}
|
||||||
ok bool
|
|
||||||
)
|
|
||||||
|
|
||||||
if routeCtx, ok = ctx.(*routeContext); !ok {
|
|
||||||
routeCtx = &routeContext{
|
|
||||||
Context: ctx,
|
|
||||||
passedRoute: []ServerInfo{r.localSrvInfo},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &loadWriter{
|
return &loadWriter{
|
||||||
router: r,
|
router: r,
|
||||||
ctx: routeCtx,
|
route: route,
|
||||||
mRoute: make(map[routeKey]*valuesRoute),
|
mRoute: make(map[routeKey]*valuesRoute),
|
||||||
mServers: make(map[string]loadcontroller.Writer),
|
mServers: make(map[string]loadcontroller.Writer),
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -69,7 +44,7 @@ type routeKey struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type valuesRoute struct {
|
type valuesRoute struct {
|
||||||
route []ServerInfo
|
route []loadcontroller.ServerInfo
|
||||||
|
|
||||||
values []container.SizeEstimation
|
values []container.SizeEstimation
|
||||||
}
|
}
|
||||||
|
@ -77,7 +52,7 @@ type valuesRoute struct {
|
||||||
type loadWriter struct {
|
type loadWriter struct {
|
||||||
router *Router
|
router *Router
|
||||||
|
|
||||||
ctx *routeContext
|
route []loadcontroller.ServerInfo
|
||||||
|
|
||||||
routeMtx sync.RWMutex
|
routeMtx sync.RWMutex
|
||||||
mRoute map[routeKey]*valuesRoute
|
mRoute map[routeKey]*valuesRoute
|
||||||
|
@ -96,11 +71,11 @@ func (w *loadWriter) Put(a container.SizeEstimation) error {
|
||||||
|
|
||||||
routeValues, ok := w.mRoute[key]
|
routeValues, ok := w.mRoute[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
route, err := w.router.routeBuilder.NextStage(a, w.ctx.passedRoute)
|
route, err := w.router.routeBuilder.NextStage(a, w.route)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if len(route) == 0 {
|
} else if len(route) == 0 {
|
||||||
route = []ServerInfo{nil}
|
route = []loadcontroller.ServerInfo{nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
routeValues = &valuesRoute{
|
routeValues = &valuesRoute{
|
||||||
|
@ -129,7 +104,7 @@ func (w *loadWriter) Put(a container.SizeEstimation) error {
|
||||||
continue // best effort
|
continue // best effort
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteWriter, err = provider.InitWriter(w.ctx)
|
remoteWriter, err = provider.InitWriter(w.route)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.router.log.Debug("could not initialize writer",
|
w.router.log.Debug("could not initialize writer",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
|
|
@ -5,25 +5,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ServerInfo describes a set of
|
|
||||||
// characteristics of a point in a route.
|
|
||||||
type ServerInfo interface {
|
|
||||||
// PublicKey returns public key of the node
|
|
||||||
// from the route in a binary representation.
|
|
||||||
PublicKey() []byte
|
|
||||||
|
|
||||||
// Iterates over network addresses of the node
|
|
||||||
// in the route. Breaks iterating on true return
|
|
||||||
// of the handler.
|
|
||||||
IterateAddresses(func(string) bool)
|
|
||||||
|
|
||||||
// Returns number of server's network addresses.
|
|
||||||
NumberOfAddresses() int
|
|
||||||
|
|
||||||
// ExternalAddresses returns external node's addresses.
|
|
||||||
ExternalAddresses() []string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Builder groups methods to route values in the network.
|
// Builder groups methods to route values in the network.
|
||||||
type Builder interface {
|
type Builder interface {
|
||||||
// NextStage must return next group of route points for the value a
|
// NextStage must return next group of route points for the value a
|
||||||
|
@ -36,7 +17,7 @@ type Builder interface {
|
||||||
// in that list (means that point is the last point in one of the route groups),
|
// in that list (means that point is the last point in one of the route groups),
|
||||||
// returned route must contain nil point that should be interpreted as signal to,
|
// returned route must contain nil point that should be interpreted as signal to,
|
||||||
// among sending to other route points, save the announcement in that point.
|
// among sending to other route points, save the announcement in that point.
|
||||||
NextStage(a container.SizeEstimation, passed []ServerInfo) ([]ServerInfo, error)
|
NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteWriterProvider describes the component
|
// RemoteWriterProvider describes the component
|
||||||
|
@ -46,5 +27,5 @@ type RemoteWriterProvider interface {
|
||||||
// corresponding to info.
|
// corresponding to info.
|
||||||
//
|
//
|
||||||
// Nil info matches the end of the route.
|
// Nil info matches the end of the route.
|
||||||
InitRemote(info ServerInfo) (loadcontroller.WriterProvider, error)
|
InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
|
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ import (
|
||||||
// If passed route has more than one point, then endpoint of the route is reached.
|
// If passed route has more than one point, then endpoint of the route is reached.
|
||||||
//
|
//
|
||||||
// The traversed route is not checked, it is assumed to be correct.
|
// The traversed route is not checked, it is assumed to be correct.
|
||||||
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.ServerInfo) ([]loadroute.ServerInfo, error) {
|
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
|
||||||
if len(passed) > 1 {
|
if len(passed) > 1 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.Serve
|
||||||
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
|
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make([]loadroute.ServerInfo, 0, len(placement))
|
res := make([]loadcontroller.ServerInfo, 0, len(placement))
|
||||||
|
|
||||||
for i := range placement {
|
for i := range placement {
|
||||||
if len(placement[i]) == 0 {
|
if len(placement[i]) == 0 {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package loadroute
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,7 +16,7 @@ type Prm struct {
|
||||||
// Characteristics of the local node's server.
|
// Characteristics of the local node's server.
|
||||||
//
|
//
|
||||||
// Must not be nil.
|
// Must not be nil.
|
||||||
LocalServerInfo ServerInfo
|
LocalServerInfo loadcontroller.ServerInfo
|
||||||
|
|
||||||
// Component for sending values to a fixed route point.
|
// Component for sending values to a fixed route point.
|
||||||
//
|
//
|
||||||
|
@ -46,7 +47,7 @@ type Router struct {
|
||||||
|
|
||||||
routeBuilder Builder
|
routeBuilder Builder
|
||||||
|
|
||||||
localSrvInfo ServerInfo
|
localSrvInfo loadcontroller.ServerInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ var errWrongRoute = errors.New("wrong route")
|
||||||
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
||||||
//
|
//
|
||||||
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
||||||
func CheckRoute(builder Builder, a container.SizeEstimation, route []ServerInfo) error {
|
func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
|
||||||
for i := 1; i < len(route); i++ {
|
for i := 1; i < len(route); i++ {
|
||||||
servers, err := builder.NextStage(a, route[:i])
|
servers, err := builder.NextStage(a, route[:i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue