diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 45c6e390..6c864431 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -39,7 +39,7 @@ const ( stopEstimationNotifyEvent = "StopEstimation" ) -func initContainerService(c *cfg) { +func initContainerService(ctx context.Context, c *cfg) { // container wrapper that tries to invoke notary // requests if chain is configured so wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) @@ -77,7 +77,7 @@ func initContainerService(c *cfg) { loadroute.WithLogger(c.log), ) - setLoadController(c, loadRouter, loadAccumulator) + setLoadController(ctx, c, loadRouter, loadAccumulator) server := containerTransportGRPC.New( containerService.NewSignService( @@ -180,7 +180,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c return cnrRdr, cnrWrt } -func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { +func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { pubKey := c.key.PublicKey().Bytes() // container wrapper that always sends non-notary @@ -211,14 +211,14 @@ func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *lo setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation) addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) { - ctrl.Start(loadcontroller.StartPrm{ + ctrl.Start(ctx, loadcontroller.StartPrm{ Epoch: ev.(containerEvent.StartEstimation).Epoch(), }) }) setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation) addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) { - ctrl.Stop(loadcontroller.StopPrm{ + ctrl.Stop(ctx, loadcontroller.StopPrm{ Epoch: ev.(containerEvent.StopEstimation).Epoch(), }) }) @@ -335,7 +335,7 @@ type remoteLoadAnnounceProvider struct { deadEndProvider loadcontroller.WriterProvider } -func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadcontroller.WriterProvider, error) { +func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) { if srv == nil { return r.deadEndProvider, nil } @@ -366,7 +366,7 @@ type remoteLoadAnnounceWriterProvider struct { client client.Client } -func (p *remoteLoadAnnounceWriterProvider) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { +func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) { return &remoteLoadAnnounceWriter{ client: p.client, }, nil @@ -536,7 +536,7 @@ func (c *usedSpaceService) ExternalAddresses() []string { } func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) { - var passedRoute []loadroute.ServerInfo + var passedRoute []loadcontroller.ServerInfo for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{ @@ -550,7 +550,7 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container passedRoute = append(passedRoute, c) - w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute)) + w, err := c.loadWriterProvider.InitWriter(passedRoute) if err != nil { return nil, fmt.Errorf("could not initialize container's used space writer: %w", err) } @@ -615,7 +615,7 @@ func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, } func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation, - route []loadroute.ServerInfo, w loadcontroller.Writer) error { + route []loadcontroller.ServerInfo, w loadcontroller.Writer) error { fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey()) if err != nil { return fmt.Errorf("could not verify that the sender belongs to the container: %w", err) diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 7768409b..fdb00322 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -97,7 +97,7 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "gRPC", initGRPC) initAndLog(c, "netmap", initNetmapService) initAndLog(c, "accounting", initAccountingService) - initAndLog(c, "container", initContainerService) + initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", initReputationService) initAndLog(c, "notification", initNotifications) diff --git a/pkg/services/container/announcement/load/controller/calls.go b/pkg/services/container/announcement/load/controller/calls.go index fde6913a..f5d5d1a3 100644 --- a/pkg/services/container/announcement/load/controller/calls.go +++ b/pkg/services/container/announcement/load/controller/calls.go @@ -15,18 +15,15 @@ type StartPrm struct { Epoch uint64 } -// nolint: containedctx type commonContext struct { epoch uint64 ctrl *Controller log *logger.Logger - - ctx context.Context } -type announceContext struct { +type announcer struct { commonContext } @@ -39,21 +36,22 @@ type announceContext struct { // // Each call acquires an announcement context for an Epoch parameter. // At the very end of the operation, the context is released. -func (c *Controller) Start(prm StartPrm) { +func (c *Controller) Start(ctx context.Context, prm StartPrm) { + var announcer *announcer // acquire announcement - execCtx := c.acquireAnnouncement(prm) - if execCtx == nil { + ctx, announcer = c.acquireAnnouncement(ctx, prm) + if announcer == nil { return } // finally stop and free the announcement - defer execCtx.freeAnnouncement() + defer announcer.freeAnnouncement() // announce local values - execCtx.announce() + announcer.announce(ctx) } -func (c *announceContext) announce() { +func (c *announcer) announce(ctx context.Context) { c.log.Debug("starting to announce the values of the metrics") var ( @@ -62,7 +60,7 @@ func (c *announceContext) announce() { ) // initialize iterator over locally collected metrics - metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx) + metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator() if err != nil { c.log.Debug("could not initialize iterator over locally collected metrics", zap.String("error", err.Error()), @@ -72,7 +70,7 @@ func (c *announceContext) announce() { } // initialize target of local announcements - targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx) + targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil) if err != nil { c.log.Debug("could not initialize announcement accumulator", zap.String("error", err.Error()), @@ -100,7 +98,7 @@ func (c *announceContext) announce() { } // finish writing - err = targetWriter.Close(c.ctx) + err = targetWriter.Close(ctx) if err != nil { c.log.Debug("could not finish writing local announcements", zap.String("error", err.Error()), @@ -112,35 +110,32 @@ func (c *announceContext) announce() { c.log.Debug("trust announcement successfully finished") } -func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext { - var ctx context.Context - +func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) { + started := true c.announceMtx.Lock() - { if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c.mAnnounceCtx[prm.Epoch] = cancel + started = false } } - c.announceMtx.Unlock() log := &logger.Logger{Logger: c.opts.log.With( zap.Uint64("epoch", prm.Epoch), )} - if ctx == nil { + if started { log.Debug("announcement is already started") - return nil + return ctx, nil } - return &announceContext{ + return ctx, &announcer{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, log: log, - ctx: ctx, }, } } @@ -176,7 +171,7 @@ type StopPrm struct { Epoch uint64 } -type stopContext struct { +type reporter struct { commonContext } @@ -188,31 +183,32 @@ type stopContext struct { // // Each call acquires a report context for an Epoch parameter. // At the very end of the operation, the context is released. -func (c *Controller) Stop(prm StopPrm) { - execCtx := c.acquireReport(prm) - if execCtx == nil { +func (c *Controller) Stop(ctx context.Context, prm StopPrm) { + var reporter *reporter + ctx, reporter = c.acquireReport(ctx, prm) + if reporter == nil { return } // finally stop and free reporting - defer execCtx.freeReport() + defer reporter.freeReport() // interrupt announcement - execCtx.freeAnnouncement() + reporter.freeAnnouncement() // report the estimations - execCtx.report() + reporter.report(ctx) } -func (c *Controller) acquireReport(prm StopPrm) *stopContext { - var ctx context.Context +func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) { + started := true c.reportMtx.Lock() - { if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c.mReportCtx[prm.Epoch] = cancel + started = false } } @@ -222,12 +218,12 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext { zap.Uint64("epoch", prm.Epoch), )} - if ctx == nil { + if started { log.Debug("report is already started") - return nil + return ctx, nil } - return &stopContext{ + return ctx, &reporter{ commonContext: commonContext{ epoch: prm.Epoch, ctrl: c, @@ -261,14 +257,14 @@ func (c *commonContext) freeReport() { } } -func (c *stopContext) report() { +func (c *reporter) report(ctx context.Context) { var ( localIterator Iterator err error ) // initialize iterator over locally accumulated announcements - localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx) + localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator() if err != nil { c.log.Debug("could not initialize iterator over locally accumulated announcements", zap.String("error", err.Error()), @@ -278,7 +274,7 @@ func (c *stopContext) report() { } // initialize final destination of load estimations - resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx) + resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil) if err != nil { c.log.Debug("could not initialize result target", zap.String("error", err.Error()), @@ -301,7 +297,7 @@ func (c *stopContext) report() { } // finish writing - err = resultWriter.Close(c.ctx) + err = resultWriter.Close(ctx) if err != nil { c.log.Debug("could not finish writing load estimations", zap.String("error", err.Error()), diff --git a/pkg/services/container/announcement/load/controller/calls_test.go b/pkg/services/container/announcement/load/controller/calls_test.go index 8e4a3ced..6ca24e86 100644 --- a/pkg/services/container/announcement/load/controller/calls_test.go +++ b/pkg/services/container/announcement/load/controller/calls_test.go @@ -28,7 +28,7 @@ func newTestStorage() *testAnnouncementStorage { } } -func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) { +func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) { if s.i != nil { return s.i, nil } @@ -53,7 +53,7 @@ func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h lo return nil } -func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) { +func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) { if s.w != nil { return s.w, nil } @@ -143,12 +143,12 @@ func TestSimpleScenario(t *testing.T) { // start both controllers go func() { - ctrlN1.Start(startPrm) + ctrlN1.Start(context.Background(), startPrm) wg.Done() }() go func() { - ctrlN2.Start(startPrm) + ctrlN2.Start(context.Background(), startPrm) wg.Done() }() @@ -161,12 +161,12 @@ func TestSimpleScenario(t *testing.T) { // stop both controllers go func() { - ctrlN1.Stop(stopPrm) + ctrlN1.Stop(context.Background(), stopPrm) wg.Done() }() go func() { - ctrlN2.Stop(stopPrm) + ctrlN2.Stop(context.Background(), stopPrm) wg.Done() }() diff --git a/pkg/services/container/announcement/load/controller/deps.go b/pkg/services/container/announcement/load/controller/deps.go index 7f7a270b..99da8594 100644 --- a/pkg/services/container/announcement/load/controller/deps.go +++ b/pkg/services/container/announcement/load/controller/deps.go @@ -45,7 +45,7 @@ type IteratorProvider interface { // // Implementations can have different logic for different // contexts, so specific ones may document their own behavior. - InitIterator(context.Context) (Iterator, error) + InitIterator() (Iterator, error) } // Writer describes the interface for storing container.SizeEstimation values. @@ -80,8 +80,24 @@ type WriterProvider interface { // // Initialization problems are reported via error. // If no error was returned, then the Writer must not be nil. - // - // Implementations can have different logic for different - // contexts, so specific ones may document their own behavior. - InitWriter(context.Context) (Writer, error) + InitWriter(route []ServerInfo) (Writer, error) +} + +// ServerInfo describes a set of +// characteristics of a point in a route. +type ServerInfo interface { + // PublicKey returns public key of the node + // from the route in a binary representation. + PublicKey() []byte + + // Iterates over network addresses of the node + // in the route. Breaks iterating on true return + // of the handler. + IterateAddresses(func(string) bool) + + // Returns number of server's network addresses. + NumberOfAddresses() int + + // ExternalAddresses returns external node's addresses. + ExternalAddresses() []string } diff --git a/pkg/services/container/announcement/load/controller/util.go b/pkg/services/container/announcement/load/controller/util.go index fb356393..223de13b 100644 --- a/pkg/services/container/announcement/load/controller/util.go +++ b/pkg/services/container/announcement/load/controller/util.go @@ -1,8 +1,6 @@ package loadcontroller import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" ) @@ -17,11 +15,11 @@ type storageWrapper struct { i Iterator } -func (s storageWrapper) InitIterator(context.Context) (Iterator, error) { +func (s storageWrapper) InitIterator() (Iterator, error) { return s.i, nil } -func (s storageWrapper) InitWriter(context.Context) (Writer, error) { +func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) { return s.w, nil } diff --git a/pkg/services/container/announcement/load/route/calls.go b/pkg/services/container/announcement/load/route/calls.go index 1cdd6591..83c368f5 100644 --- a/pkg/services/container/announcement/load/route/calls.go +++ b/pkg/services/container/announcement/load/route/calls.go @@ -10,26 +10,9 @@ import ( "go.uber.org/zap" ) -// nolint: containedctx -type routeContext struct { - context.Context - - passedRoute []ServerInfo -} - -// NewRouteContext wraps the main context of value passing with its traversal route. -// -// Passing the result to Router.InitWriter method will allow you to continue this route. -func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context { - return &routeContext{ - Context: ctx, - passedRoute: passed, - } -} - // InitWriter initializes and returns Writer that sends each value to its next route point. // -// If ctx was created by NewRouteContext, then the traversed route is taken into account, +// If route is present, then it is taken into account, // and the value will be sent to its continuation. Otherwise, the route will be laid // from scratch and the value will be sent to its primary point. // @@ -41,22 +24,14 @@ func NewRouteContext(ctx context.Context, passed []ServerInfo) context.Context { // runtime and never returns an error. // // Always returns nil error. -func (r *Router) InitWriter(ctx context.Context) (loadcontroller.Writer, error) { - var ( - routeCtx *routeContext - ok bool - ) - - if routeCtx, ok = ctx.(*routeContext); !ok { - routeCtx = &routeContext{ - Context: ctx, - passedRoute: []ServerInfo{r.localSrvInfo}, - } +func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) { + if len(route) == 0 { + route = []loadcontroller.ServerInfo{r.localSrvInfo} } return &loadWriter{ router: r, - ctx: routeCtx, + route: route, mRoute: make(map[routeKey]*valuesRoute), mServers: make(map[string]loadcontroller.Writer), }, nil @@ -69,7 +44,7 @@ type routeKey struct { } type valuesRoute struct { - route []ServerInfo + route []loadcontroller.ServerInfo values []container.SizeEstimation } @@ -77,7 +52,7 @@ type valuesRoute struct { type loadWriter struct { router *Router - ctx *routeContext + route []loadcontroller.ServerInfo routeMtx sync.RWMutex mRoute map[routeKey]*valuesRoute @@ -96,11 +71,11 @@ func (w *loadWriter) Put(a container.SizeEstimation) error { routeValues, ok := w.mRoute[key] if !ok { - route, err := w.router.routeBuilder.NextStage(a, w.ctx.passedRoute) + route, err := w.router.routeBuilder.NextStage(a, w.route) if err != nil { return err } else if len(route) == 0 { - route = []ServerInfo{nil} + route = []loadcontroller.ServerInfo{nil} } routeValues = &valuesRoute{ @@ -129,7 +104,7 @@ func (w *loadWriter) Put(a container.SizeEstimation) error { continue // best effort } - remoteWriter, err = provider.InitWriter(w.ctx) + remoteWriter, err = provider.InitWriter(w.route) if err != nil { w.router.log.Debug("could not initialize writer", zap.String("error", err.Error()), diff --git a/pkg/services/container/announcement/load/route/deps.go b/pkg/services/container/announcement/load/route/deps.go index 429cda3e..b255900f 100644 --- a/pkg/services/container/announcement/load/route/deps.go +++ b/pkg/services/container/announcement/load/route/deps.go @@ -5,25 +5,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" ) -// ServerInfo describes a set of -// characteristics of a point in a route. -type ServerInfo interface { - // PublicKey returns public key of the node - // from the route in a binary representation. - PublicKey() []byte - - // Iterates over network addresses of the node - // in the route. Breaks iterating on true return - // of the handler. - IterateAddresses(func(string) bool) - - // Returns number of server's network addresses. - NumberOfAddresses() int - - // ExternalAddresses returns external node's addresses. - ExternalAddresses() []string -} - // Builder groups methods to route values in the network. type Builder interface { // NextStage must return next group of route points for the value a @@ -36,7 +17,7 @@ type Builder interface { // in that list (means that point is the last point in one of the route groups), // returned route must contain nil point that should be interpreted as signal to, // among sending to other route points, save the announcement in that point. - NextStage(a container.SizeEstimation, passed []ServerInfo) ([]ServerInfo, error) + NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) } // RemoteWriterProvider describes the component @@ -46,5 +27,5 @@ type RemoteWriterProvider interface { // corresponding to info. // // Nil info matches the end of the route. - InitRemote(info ServerInfo) (loadcontroller.WriterProvider, error) + InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) } diff --git a/pkg/services/container/announcement/load/route/placement/calls.go b/pkg/services/container/announcement/load/route/placement/calls.go index 3db0d967..68bdb43a 100644 --- a/pkg/services/container/announcement/load/route/placement/calls.go +++ b/pkg/services/container/announcement/load/route/placement/calls.go @@ -5,7 +5,7 @@ import ( "fmt" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route" + loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" ) @@ -15,7 +15,7 @@ import ( // If passed route has more than one point, then endpoint of the route is reached. // // The traversed route is not checked, it is assumed to be correct. -func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.ServerInfo) ([]loadroute.ServerInfo, error) { +func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) { if len(passed) > 1 { return nil, nil } @@ -27,7 +27,7 @@ func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.Serve return nil, fmt.Errorf("could not build placement %s: %w", cnr, err) } - res := make([]loadroute.ServerInfo, 0, len(placement)) + res := make([]loadcontroller.ServerInfo, 0, len(placement)) for i := range placement { if len(placement[i]) == 0 { diff --git a/pkg/services/container/announcement/load/route/router.go b/pkg/services/container/announcement/load/route/router.go index 6169a2ae..c8f784b1 100644 --- a/pkg/services/container/announcement/load/route/router.go +++ b/pkg/services/container/announcement/load/route/router.go @@ -3,6 +3,7 @@ package loadroute import ( "fmt" + loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" ) @@ -15,7 +16,7 @@ type Prm struct { // Characteristics of the local node's server. // // Must not be nil. - LocalServerInfo ServerInfo + LocalServerInfo loadcontroller.ServerInfo // Component for sending values to a fixed route point. // @@ -46,7 +47,7 @@ type Router struct { routeBuilder Builder - localSrvInfo ServerInfo + localSrvInfo loadcontroller.ServerInfo } const invalidPrmValFmt = "invalid parameter %s (%T):%v" diff --git a/pkg/services/container/announcement/load/route/util.go b/pkg/services/container/announcement/load/route/util.go index fca1e579..ea0f51aa 100644 --- a/pkg/services/container/announcement/load/route/util.go +++ b/pkg/services/container/announcement/load/route/util.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" + loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" ) @@ -12,7 +13,7 @@ var errWrongRoute = errors.New("wrong route") // CheckRoute checks if the route is a route correctly constructed by the builder for value a. // // Returns nil if route is correct, otherwise an error clarifying the inconsistency. -func CheckRoute(builder Builder, a container.SizeEstimation, route []ServerInfo) error { +func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error { for i := 1; i < len(route); i++ { servers, err := builder.NextStage(a, route[:i]) if err != nil {