Refactor container service #217

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:refactoring/object-3061-container into master 2023-04-05 14:38:01 +00:00
4 changed files with 43 additions and 47 deletions
Showing only changes of commit 6e90d5f117 - Show all commits

View file

@ -39,7 +39,7 @@ const (
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(c *cfg) {
func initContainerService(ctx context.Context, c *cfg) {
// container wrapper that tries to invoke notary
// requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
@ -77,7 +77,7 @@ func initContainerService(c *cfg) {
loadroute.WithLogger(c.log),
)
setLoadController(c, loadRouter, loadAccumulator)
setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New(
containerService.NewSignService(
@ -180,7 +180,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
return cnrRdr, cnrWrt
}
func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary
@ -211,14 +211,14 @@ func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *lo
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(loadcontroller.StartPrm{
ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(loadcontroller.StopPrm{
ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})

View file

@ -97,7 +97,7 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "gRPC", initGRPC)
initAndLog(c, "netmap", initNetmapService)
initAndLog(c, "accounting", initAccountingService)
initAndLog(c, "container", initContainerService)
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", initReputationService)
initAndLog(c, "notification", initNotifications)

View file

@ -15,18 +15,15 @@ type StartPrm struct {
Epoch uint64
}
// nolint: containedctx
type commonContext struct {
epoch uint64
ctrl *Controller
log *logger.Logger
ctx context.Context
}
type announceContext struct {
type announcer struct {
commonContext
}
@ -39,21 +36,22 @@ type announceContext struct {
//
// Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Start(prm StartPrm) {
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
var announcer *announcer
// acquire announcement
execCtx := c.acquireAnnouncement(prm)
if execCtx == nil {
ctx, announcer = c.acquireAnnouncement(ctx, prm)
if announcer == nil {
return
}
// finally stop and free the announcement
defer execCtx.freeAnnouncement()
defer announcer.freeAnnouncement()
// announce local values
execCtx.announce()
announcer.announce(ctx)
}
func (c *announceContext) announce() {
func (c *announcer) announce(ctx context.Context) {
c.log.Debug("starting to announce the values of the metrics")
var (
@ -100,7 +98,7 @@ func (c *announceContext) announce() {
}
// finish writing
err = targetWriter.Close(c.ctx)
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug("could not finish writing local announcements",
zap.String("error", err.Error()),
@ -112,35 +110,32 @@ func (c *announceContext) announce() {
c.log.Debug("trust announcement successfully finished")
}
func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext {
var ctx context.Context
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
started := true
c.announceMtx.Lock()
{
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background())
ctx, cancel = context.WithCancel(ctx)
c.mAnnounceCtx[prm.Epoch] = cancel
started = false
}
}
c.announceMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if ctx == nil {
if started {
log.Debug("announcement is already started")
return nil
return ctx, nil
}
return &announceContext{
return ctx, &announcer{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
ctx: ctx,
},
}
}
@ -176,7 +171,7 @@ type StopPrm struct {
Epoch uint64
}
type stopContext struct {
type reporter struct {
commonContext
}
@ -188,31 +183,32 @@ type stopContext struct {
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Stop(prm StopPrm) {
execCtx := c.acquireReport(prm)
if execCtx == nil {
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
var reporter *reporter
ctx, reporter = c.acquireReport(ctx, prm)
if reporter == nil {
return
}
// finally stop and free reporting
defer execCtx.freeReport()
defer reporter.freeReport()
// interrupt announcement
execCtx.freeAnnouncement()
reporter.freeAnnouncement()
// report the estimations
execCtx.report()
reporter.report(ctx)
}
func (c *Controller) acquireReport(prm StopPrm) *stopContext {
var ctx context.Context
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
started := true
c.reportMtx.Lock()
{
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background())
ctx, cancel = context.WithCancel(ctx)
c.mReportCtx[prm.Epoch] = cancel
started = false
}
}
@ -222,12 +218,12 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext {
zap.Uint64("epoch", prm.Epoch),
)}
if ctx == nil {
if started {
log.Debug("report is already started")
return nil
return ctx, nil
}
return &stopContext{
return ctx, &reporter{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
@ -261,7 +257,7 @@ func (c *commonContext) freeReport() {
}
}
func (c *stopContext) report() {
func (c *reporter) report(ctx context.Context) {
var (
localIterator Iterator
err error
@ -301,7 +297,7 @@ func (c *stopContext) report() {
}
// finish writing
err = resultWriter.Close(c.ctx)
err = resultWriter.Close(ctx)
if err != nil {
c.log.Debug("could not finish writing load estimations",
zap.String("error", err.Error()),

View file

@ -143,12 +143,12 @@ func TestSimpleScenario(t *testing.T) {
// start both controllers
go func() {
ctrlN1.Start(startPrm)
ctrlN1.Start(context.Background(), startPrm)
wg.Done()
}()
go func() {
ctrlN2.Start(startPrm)
ctrlN2.Start(context.Background(), startPrm)
wg.Done()
}()
@ -161,12 +161,12 @@ func TestSimpleScenario(t *testing.T) {
// stop both controllers
go func() {
ctrlN1.Stop(stopPrm)
ctrlN1.Stop(context.Background(), stopPrm)
wg.Done()
}()
go func() {
ctrlN2.Stop(stopPrm)
ctrlN2.Stop(context.Background(), stopPrm)
wg.Done()
}()