forked from TrueCloudLab/frostfs-node
[#168] node: Refactor container service init
Resolve funlen linter for initContainerService function Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
dcd39f8fdd
commit
28dc9e2190
1 changed files with 66 additions and 57 deletions
|
@ -39,7 +39,6 @@ const (
|
||||||
stopEstimationNotifyEvent = "StopEstimation"
|
stopEstimationNotifyEvent = "StopEstimation"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nolint: funlen
|
|
||||||
func initContainerService(c *cfg) {
|
func initContainerService(c *cfg) {
|
||||||
// container wrapper that tries to invoke notary
|
// container wrapper that tries to invoke notary
|
||||||
// requests if chain is configured so
|
// requests if chain is configured so
|
||||||
|
@ -48,21 +47,68 @@ func initContainerService(c *cfg) {
|
||||||
|
|
||||||
c.shared.cnrClient = wrap
|
c.shared.cnrClient = wrap
|
||||||
|
|
||||||
// container wrapper that always sends non-notary
|
|
||||||
// requests
|
|
||||||
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
|
||||||
fatalOnErr(err)
|
|
||||||
|
|
||||||
cnrSrc := cntClient.AsContainerSource(wrap)
|
cnrSrc := cntClient.AsContainerSource(wrap)
|
||||||
|
|
||||||
|
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
|
||||||
|
|
||||||
|
loadAccumulator := loadstorage.New(loadstorage.Prm{})
|
||||||
|
|
||||||
|
loadPlacementBuilder := &loadPlacementBuilder{
|
||||||
|
log: c.log,
|
||||||
|
nmSrc: c.netMapSource,
|
||||||
|
cnrSrc: cnrSrc,
|
||||||
|
}
|
||||||
|
|
||||||
|
routeBuilder := placementrouter.New(placementrouter.Prm{
|
||||||
|
PlacementBuilder: loadPlacementBuilder,
|
||||||
|
})
|
||||||
|
|
||||||
|
loadRouter := loadroute.New(
|
||||||
|
loadroute.Prm{
|
||||||
|
LocalServerInfo: c,
|
||||||
|
RemoteWriterProvider: &remoteLoadAnnounceProvider{
|
||||||
|
key: &c.key.PrivateKey,
|
||||||
|
netmapKeys: c,
|
||||||
|
clientCache: c.bgClientCache,
|
||||||
|
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
|
||||||
|
},
|
||||||
|
Builder: routeBuilder,
|
||||||
|
},
|
||||||
|
loadroute.WithLogger(c.log),
|
||||||
|
)
|
||||||
|
|
||||||
|
setLoadController(c, loadRouter, loadAccumulator)
|
||||||
|
|
||||||
|
server := containerTransportGRPC.New(
|
||||||
|
containerService.NewSignService(
|
||||||
|
&c.key.PrivateKey,
|
||||||
|
containerService.NewResponseService(
|
||||||
|
&usedSpaceService{
|
||||||
|
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
||||||
|
loadWriterProvider: loadRouter,
|
||||||
|
loadPlacementBuilder: loadPlacementBuilder,
|
||||||
|
routeBuilder: routeBuilder,
|
||||||
|
cfg: c,
|
||||||
|
},
|
||||||
|
c.respSvc,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, srv := range c.cfgGRPC.servers {
|
||||||
|
containerGRPC.RegisterContainerServiceServer(srv, server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||||
eACLFetcher := &morphEACLFetcher{
|
eACLFetcher := &morphEACLFetcher{
|
||||||
w: wrap,
|
w: client,
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrRdr := new(morphContainerReader)
|
cnrRdr := new(morphContainerReader)
|
||||||
|
|
||||||
cnrWrt := &morphContainerWriter{
|
cnrWrt := &morphContainerWriter{
|
||||||
neoClient: wrap,
|
neoClient: client,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cfgMorph.cacheTTL <= 0 {
|
if c.cfgMorph.cacheTTL <= 0 {
|
||||||
|
@ -70,12 +116,12 @@ func initContainerService(c *cfg) {
|
||||||
cnrRdr.eacl = eACLFetcher
|
cnrRdr.eacl = eACLFetcher
|
||||||
c.cfgObject.cnrSource = cnrSrc
|
c.cfgObject.cnrSource = cnrSrc
|
||||||
cnrRdr.get = cnrSrc
|
cnrRdr.get = cnrSrc
|
||||||
cnrRdr.lister = wrap
|
cnrRdr.lister = client
|
||||||
} else {
|
} else {
|
||||||
// use RPC node as source of Container contract items (with caching)
|
// use RPC node as source of Container contract items (with caching)
|
||||||
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
|
cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL)
|
||||||
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
||||||
cachedContainerLister := newCachedContainerLister(wrap, c.cfgMorph.cacheTTL)
|
cachedContainerLister := newCachedContainerLister(client, c.cfgMorph.cacheTTL)
|
||||||
|
|
||||||
subscribeToContainerCreation(c, func(e event.Event) {
|
subscribeToContainerCreation(c, func(e event.Event) {
|
||||||
ev := e.(containerEvent.PutSuccess)
|
ev := e.(containerEvent.PutSuccess)
|
||||||
|
@ -131,45 +177,28 @@ func initContainerService(c *cfg) {
|
||||||
cnrWrt.eacls = cachedEACLStorage
|
cnrWrt.eacls = cachedEACLStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
localMetrics := &localStorageLoad{
|
return cnrRdr, cnrWrt
|
||||||
log: c.log,
|
|
||||||
engine: c.cfgObject.cfgLocalStorage.localStorage,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setLoadController(c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
|
||||||
pubKey := c.key.PublicKey().Bytes()
|
pubKey := c.key.PublicKey().Bytes()
|
||||||
|
|
||||||
|
// container wrapper that always sends non-notary
|
||||||
|
// requests
|
||||||
|
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
resultWriter := &morphLoadWriter{
|
resultWriter := &morphLoadWriter{
|
||||||
log: c.log,
|
log: c.log,
|
||||||
cnrMorphClient: wrapperNoNotary,
|
cnrMorphClient: wrapperNoNotary,
|
||||||
key: pubKey,
|
key: pubKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
loadAccumulator := loadstorage.New(loadstorage.Prm{})
|
localMetrics := &localStorageLoad{
|
||||||
|
|
||||||
loadPlacementBuilder := &loadPlacementBuilder{
|
|
||||||
log: c.log,
|
log: c.log,
|
||||||
nmSrc: c.netMapSource,
|
engine: c.cfgObject.cfgLocalStorage.localStorage,
|
||||||
cnrSrc: cnrSrc,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
routeBuilder := placementrouter.New(placementrouter.Prm{
|
|
||||||
PlacementBuilder: loadPlacementBuilder,
|
|
||||||
})
|
|
||||||
|
|
||||||
loadRouter := loadroute.New(
|
|
||||||
loadroute.Prm{
|
|
||||||
LocalServerInfo: c,
|
|
||||||
RemoteWriterProvider: &remoteLoadAnnounceProvider{
|
|
||||||
key: &c.key.PrivateKey,
|
|
||||||
netmapKeys: c,
|
|
||||||
clientCache: c.bgClientCache,
|
|
||||||
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
|
|
||||||
},
|
|
||||||
Builder: routeBuilder,
|
|
||||||
},
|
|
||||||
loadroute.WithLogger(c.log),
|
|
||||||
)
|
|
||||||
|
|
||||||
ctrl := loadcontroller.New(
|
ctrl := loadcontroller.New(
|
||||||
loadcontroller.Prm{
|
loadcontroller.Prm{
|
||||||
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
|
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
|
||||||
|
@ -193,26 +222,6 @@ func initContainerService(c *cfg) {
|
||||||
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
server := containerTransportGRPC.New(
|
|
||||||
containerService.NewSignService(
|
|
||||||
&c.key.PrivateKey,
|
|
||||||
containerService.NewResponseService(
|
|
||||||
&usedSpaceService{
|
|
||||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt)),
|
|
||||||
loadWriterProvider: loadRouter,
|
|
||||||
loadPlacementBuilder: loadPlacementBuilder,
|
|
||||||
routeBuilder: routeBuilder,
|
|
||||||
cfg: c,
|
|
||||||
},
|
|
||||||
c.respSvc,
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, srv := range c.cfgGRPC.servers {
|
|
||||||
containerGRPC.RegisterContainerServiceServer(srv, server)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
||||||
|
|
Loading…
Reference in a new issue