package loadcontroller import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "go.uber.org/zap" ) // StartPrm groups the required parameters of the Controller.Start method. type StartPrm struct { // Epoch number by which you want to select // the values of the used space of containers. Epoch uint64 } type commonContext struct { epoch uint64 ctrl *Controller log *logger.Logger } type announcer struct { commonContext } // Start starts the processing of container.SizeEstimation values. // // Single Start operation overtakes all data from LocalMetrics to // LocalAnnouncementTarget (Controller's parameters). // No filter by epoch is used for the iterator, since it is expected // that the source of metrics does not track the change of epochs. // // Each call acquires an announcement context for an Epoch parameter. // At the very end of the operation, the context is released. func (c *Controller) Start(ctx context.Context, prm StartPrm) { var announcer *announcer // acquire announcement ctx, announcer = c.acquireAnnouncement(ctx, prm) if announcer == nil { return } // finally stop and free the announcement defer announcer.freeAnnouncement() // announce local values announcer.announce(ctx) } func (c *announcer) announce(ctx context.Context) { c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics) var ( metricsIterator Iterator err error ) // initialize iterator over locally collected metrics metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator() if err != nil { c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics, zap.String("error", err.Error()), ) return } // initialize target of local announcements targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil) if err != nil { c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator, zap.String("error", err.Error()), ) return } // iterate over all collected metrics and write them to the target err = metricsIterator.Iterate( func(container.SizeEstimation) bool { return true // local metrics don't know about epochs }, func(a container.SizeEstimation) error { a.SetEpoch(c.epoch) // set epoch explicitly return targetWriter.Put(a) }, ) if err != nil { c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted, zap.String("error", err.Error()), ) return } // finish writing err = targetWriter.Close(ctx) if err != nil { c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements, zap.String("error", err.Error()), ) return } c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished) } func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) { started := true c.announceMtx.Lock() { if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { ctx, cancel = context.WithCancel(ctx) c.mAnnounceCtx[prm.Epoch] = cancel started = false } } c.announceMtx.Unlock() log := &logger.Logger{Logger: c.opts.log.With( zap.Uint64("epoch", prm.Epoch), )} if started { log.Debug(logs.ControllerAnnouncementIsAlreadyStarted) return ctx, nil } return ctx, &announcer{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, log: log, }, } } func (c *commonContext) freeAnnouncement() { var stopped bool c.ctrl.announceMtx.Lock() { var cancel context.CancelFunc cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch] if stopped { cancel() delete(c.ctrl.mAnnounceCtx, c.epoch) } } c.ctrl.announceMtx.Unlock() if stopped { c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted) } else { c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted) } } // StopPrm groups the required parameters of the Controller.Stop method. type StopPrm struct { // Epoch number the analysis of the values of which must be interrupted. Epoch uint64 } type reporter struct { commonContext } // Stop interrupts the processing of container.SizeEstimation values. // // Single Stop operation releases an announcement context and overtakes // all data from AnnouncementAccumulator to ResultReceiver (Controller's // parameters). Only values for the specified Epoch parameter are processed. // // Each call acquires a report context for an Epoch parameter. // At the very end of the operation, the context is released. func (c *Controller) Stop(ctx context.Context, prm StopPrm) { var reporter *reporter ctx, reporter = c.acquireReport(ctx, prm) if reporter == nil { return } // finally stop and free reporting defer reporter.freeReport() // interrupt announcement reporter.freeAnnouncement() // report the estimations reporter.report(ctx) } func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) { started := true c.reportMtx.Lock() { if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { ctx, cancel = context.WithCancel(ctx) c.mReportCtx[prm.Epoch] = cancel started = false } } c.reportMtx.Unlock() log := &logger.Logger{Logger: c.opts.log.With( zap.Uint64("epoch", prm.Epoch), )} if started { log.Debug(logs.ControllerReportIsAlreadyStarted) return ctx, nil } return ctx, &reporter{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, log: log, }, } } func (c *commonContext) freeReport() { var stopped bool c.ctrl.reportMtx.Lock() { var cancel context.CancelFunc cancel, stopped = c.ctrl.mReportCtx[c.epoch] if stopped { cancel() delete(c.ctrl.mReportCtx, c.epoch) } } c.ctrl.reportMtx.Unlock() if stopped { c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted) } else { c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted) } } func (c *reporter) report(ctx context.Context) { var ( localIterator Iterator err error ) // initialize iterator over locally accumulated announcements localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator() if err != nil { c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements, zap.String("error", err.Error()), ) return } // initialize final destination of load estimations resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil) if err != nil { c.log.Debug(logs.ControllerCouldNotInitializeResultTarget, zap.String("error", err.Error()), ) return } // iterate over all accumulated announcements and write them to the target err = localIterator.Iterate( usedSpaceFilterEpochEQ(c.epoch), resultWriter.Put, ) if err != nil { c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted, zap.String("error", err.Error()), ) return } // finish writing err = resultWriter.Close(ctx) if err != nil { c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations, zap.String("error", err.Error()), ) } }