diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 7a88497eb4..6c864431de 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -39,7 +39,7 @@ const ( stopEstimationNotifyEvent = "StopEstimation" ) -func initContainerService(c *cfg) { +func initContainerService(ctx context.Context, c *cfg) { // container wrapper that tries to invoke notary // requests if chain is configured so wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) @@ -77,7 +77,7 @@ func initContainerService(c *cfg) { loadroute.WithLogger(c.log), ) - setLoadController(c, loadRouter, loadAccumulator) + setLoadController(ctx, c, loadRouter, loadAccumulator) server := containerTransportGRPC.New( containerService.NewSignService( @@ -180,7 +180,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c return cnrRdr, cnrWrt } -func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { +func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { pubKey := c.key.PublicKey().Bytes() // container wrapper that always sends non-notary @@ -211,14 +211,14 @@ func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *lo setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation) addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) { - ctrl.Start(loadcontroller.StartPrm{ + ctrl.Start(ctx, loadcontroller.StartPrm{ Epoch: ev.(containerEvent.StartEstimation).Epoch(), }) }) setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation) addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) { - ctrl.Stop(loadcontroller.StopPrm{ + ctrl.Stop(ctx, loadcontroller.StopPrm{ Epoch: ev.(containerEvent.StopEstimation).Epoch(), }) }) diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 7768409b0f..fdb0032202 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -97,7 +97,7 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "gRPC", initGRPC) initAndLog(c, "netmap", initNetmapService) initAndLog(c, "accounting", initAccountingService) - initAndLog(c, "container", initContainerService) + initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", initReputationService) initAndLog(c, "notification", initNotifications) diff --git a/pkg/services/container/announcement/load/controller/calls.go b/pkg/services/container/announcement/load/controller/calls.go index 8c5fbeacb0..f5d5d1a3dd 100644 --- a/pkg/services/container/announcement/load/controller/calls.go +++ b/pkg/services/container/announcement/load/controller/calls.go @@ -15,18 +15,15 @@ type StartPrm struct { Epoch uint64 } -// nolint: containedctx type commonContext struct { epoch uint64 ctrl *Controller log *logger.Logger - - ctx context.Context } -type announceContext struct { +type announcer struct { commonContext } @@ -39,21 +36,22 @@ type announceContext struct { // // Each call acquires an announcement context for an Epoch parameter. // At the very end of the operation, the context is released. -func (c *Controller) Start(prm StartPrm) { +func (c *Controller) Start(ctx context.Context, prm StartPrm) { + var announcer *announcer // acquire announcement - execCtx := c.acquireAnnouncement(prm) - if execCtx == nil { + ctx, announcer = c.acquireAnnouncement(ctx, prm) + if announcer == nil { return } // finally stop and free the announcement - defer execCtx.freeAnnouncement() + defer announcer.freeAnnouncement() // announce local values - execCtx.announce() + announcer.announce(ctx) } -func (c *announceContext) announce() { +func (c *announcer) announce(ctx context.Context) { c.log.Debug("starting to announce the values of the metrics") var ( @@ -100,7 +98,7 @@ func (c *announceContext) announce() { } // finish writing - err = targetWriter.Close(c.ctx) + err = targetWriter.Close(ctx) if err != nil { c.log.Debug("could not finish writing local announcements", zap.String("error", err.Error()), @@ -112,35 +110,32 @@ func (c *announceContext) announce() { c.log.Debug("trust announcement successfully finished") } -func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext { - var ctx context.Context - +func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) { + started := true c.announceMtx.Lock() - { if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c.mAnnounceCtx[prm.Epoch] = cancel + started = false } } - c.announceMtx.Unlock() log := &logger.Logger{Logger: c.opts.log.With( zap.Uint64("epoch", prm.Epoch), )} - if ctx == nil { + if started { log.Debug("announcement is already started") - return nil + return ctx, nil } - return &announceContext{ + return ctx, &announcer{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, log: log, - ctx: ctx, }, } } @@ -176,7 +171,7 @@ type StopPrm struct { Epoch uint64 } -type stopContext struct { +type reporter struct { commonContext } @@ -188,31 +183,32 @@ type stopContext struct { // // Each call acquires a report context for an Epoch parameter. // At the very end of the operation, the context is released. -func (c *Controller) Stop(prm StopPrm) { - execCtx := c.acquireReport(prm) - if execCtx == nil { +func (c *Controller) Stop(ctx context.Context, prm StopPrm) { + var reporter *reporter + ctx, reporter = c.acquireReport(ctx, prm) + if reporter == nil { return } // finally stop and free reporting - defer execCtx.freeReport() + defer reporter.freeReport() // interrupt announcement - execCtx.freeAnnouncement() + reporter.freeAnnouncement() // report the estimations - execCtx.report() + reporter.report(ctx) } -func (c *Controller) acquireReport(prm StopPrm) *stopContext { - var ctx context.Context +func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) { + started := true c.reportMtx.Lock() - { if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c.mReportCtx[prm.Epoch] = cancel + started = false } } @@ -222,12 +218,12 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext { zap.Uint64("epoch", prm.Epoch), )} - if ctx == nil { + if started { log.Debug("report is already started") - return nil + return ctx, nil } - return &stopContext{ + return ctx, &reporter{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, @@ -261,7 +257,7 @@ func (c *commonContext) freeReport() { } } -func (c *stopContext) report() { +func (c *reporter) report(ctx context.Context) { var ( localIterator Iterator err error @@ -301,7 +297,7 @@ func (c *stopContext) report() { } // finish writing - err = resultWriter.Close(c.ctx) + err = resultWriter.Close(ctx) if err != nil { c.log.Debug("could not finish writing load estimations", zap.String("error", err.Error()), diff --git a/pkg/services/container/announcement/load/controller/calls_test.go b/pkg/services/container/announcement/load/controller/calls_test.go index 4a791f4bd1..6ca24e8699 100644 --- a/pkg/services/container/announcement/load/controller/calls_test.go +++ b/pkg/services/container/announcement/load/controller/calls_test.go @@ -143,12 +143,12 @@ func TestSimpleScenario(t *testing.T) { // start both controllers go func() { - ctrlN1.Start(startPrm) + ctrlN1.Start(context.Background(), startPrm) wg.Done() }() go func() { - ctrlN2.Start(startPrm) + ctrlN2.Start(context.Background(), startPrm) wg.Done() }() @@ -161,12 +161,12 @@ func TestSimpleScenario(t *testing.T) { // stop both controllers go func() { - ctrlN1.Stop(stopPrm) + ctrlN1.Stop(context.Background(), stopPrm) wg.Done() }() go func() { - ctrlN2.Stop(stopPrm) + ctrlN2.Stop(context.Background(), stopPrm) wg.Done() }()