diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index bcd9e3be..23b17585 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -39,7 +39,6 @@ const ( stopEstimationNotifyEvent = "StopEstimation" ) -// nolint: funlen func initContainerService(c *cfg) { // container wrapper that tries to invoke notary // requests if chain is configured so @@ -48,21 +47,68 @@ func initContainerService(c *cfg) { c.shared.cnrClient = wrap - // container wrapper that always sends non-notary - // requests - wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0) - fatalOnErr(err) - cnrSrc := cntClient.AsContainerSource(wrap) + cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc) + + loadAccumulator := loadstorage.New(loadstorage.Prm{}) + + loadPlacementBuilder := &loadPlacementBuilder{ + log: c.log, + nmSrc: c.netMapSource, + cnrSrc: cnrSrc, + } + + routeBuilder := placementrouter.New(placementrouter.Prm{ + PlacementBuilder: loadPlacementBuilder, + }) + + loadRouter := loadroute.New( + loadroute.Prm{ + LocalServerInfo: c, + RemoteWriterProvider: &remoteLoadAnnounceProvider{ + key: &c.key.PrivateKey, + netmapKeys: c, + clientCache: c.bgClientCache, + deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator), + }, + Builder: routeBuilder, + }, + loadroute.WithLogger(c.log), + ) + + setLoadController(c, loadRouter, loadAccumulator) + + server := containerTransportGRPC.New( + containerService.NewSignService( + &c.key.PrivateKey, + containerService.NewResponseService( + &usedSpaceService{ + Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)), + loadWriterProvider: loadRouter, + loadPlacementBuilder: loadPlacementBuilder, + routeBuilder: routeBuilder, + cfg: c, + }, + c.respSvc, + ), + ), + ) + + for _, srv := range c.cfgGRPC.servers { + containerGRPC.RegisterContainerServiceServer(srv, server) + } +} + +func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { eACLFetcher := &morphEACLFetcher{ - w: wrap, + w: client, } cnrRdr := new(morphContainerReader) cnrWrt := &morphContainerWriter{ - neoClient: wrap, + neoClient: client, } if c.cfgMorph.cacheTTL <= 0 { @@ -70,12 +116,12 @@ func initContainerService(c *cfg) { cnrRdr.eacl = eACLFetcher c.cfgObject.cnrSource = cnrSrc cnrRdr.get = cnrSrc - cnrRdr.lister = wrap + cnrRdr.lister = client } else { // use RPC node as source of Container contract items (with caching) cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL) cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL) - cachedContainerLister := newCachedContainerLister(wrap, c.cfgMorph.cacheTTL) + cachedContainerLister := newCachedContainerLister(client, c.cfgMorph.cacheTTL) subscribeToContainerCreation(c, func(e event.Event) { ev := e.(containerEvent.PutSuccess) @@ -131,45 +177,28 @@ func initContainerService(c *cfg) { cnrWrt.eacls = cachedEACLStorage } - localMetrics := &localStorageLoad{ - log: c.log, - engine: c.cfgObject.cfgLocalStorage.localStorage, - } + return cnrRdr, cnrWrt +} +func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { pubKey := c.key.PublicKey().Bytes() + // container wrapper that always sends non-notary + // requests + wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0) + fatalOnErr(err) + resultWriter := &morphLoadWriter{ log: c.log, cnrMorphClient: wrapperNoNotary, key: pubKey, } - loadAccumulator := loadstorage.New(loadstorage.Prm{}) - - loadPlacementBuilder := &loadPlacementBuilder{ + localMetrics := &localStorageLoad{ log: c.log, - nmSrc: c.netMapSource, - cnrSrc: cnrSrc, + engine: c.cfgObject.cfgLocalStorage.localStorage, } - routeBuilder := placementrouter.New(placementrouter.Prm{ - PlacementBuilder: loadPlacementBuilder, - }) - - loadRouter := loadroute.New( - loadroute.Prm{ - LocalServerInfo: c, - RemoteWriterProvider: &remoteLoadAnnounceProvider{ - key: &c.key.PrivateKey, - netmapKeys: c, - clientCache: c.bgClientCache, - deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator), - }, - Builder: routeBuilder, - }, - loadroute.WithLogger(c.log), - ) - ctrl := loadcontroller.New( loadcontroller.Prm{ LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics), @@ -193,26 +222,6 @@ func initContainerService(c *cfg) { Epoch: ev.(containerEvent.StopEstimation).Epoch(), }) }) - - server := containerTransportGRPC.New( - containerService.NewSignService( - &c.key.PrivateKey, - containerService.NewResponseService( - &usedSpaceService{ - Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)), - loadWriterProvider: loadRouter, - loadPlacementBuilder: loadPlacementBuilder, - routeBuilder: routeBuilder, - cfg: c, - }, - c.respSvc, - ), - ), - ) - - for _, srv := range c.cfgGRPC.servers { - containerGRPC.RegisterContainerServiceServer(srv, server) - } } // addContainerNotificationHandler adds handler that will be executed synchronously.