diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 0f6fcc0b6b..8680aac28a 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -163,9 +163,7 @@ func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiA return c.(coreclient.MultiAddressClient), nil } -// nolint: funlen func initObjectService(c *cfg) { - ls := c.cfgObject.cfgLocalStorage.localStorage keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state) clientConstructor := &reputationClientConstructor{ @@ -184,37 +182,62 @@ func initObjectService(c *cfg) { basicConstructor: c.clientCache, } - putConstructor := &coreClientConstructor{ - log: c.log, - nmSrc: c.netMapSource, - netState: c.cfgNetmap.state, - trustStorage: c.cfgReputation.localTrustStorage, - basicConstructor: c.putClientCache, - } + c.replicator = createReplicator(c, keyStorage, clientConstructor) - var irFetcher v2.InnerRingFetcher + addPolicer(c, keyStorage, clientConstructor) - if c.cfgMorph.client.ProbeNotary() { - irFetcher = &innerRingFetcherWithNotary{ - sidechain: c.cfgMorph.client, - } - } else { - irFetcher = &innerRingFetcherWithoutNotary{ - nm: c.cfgNetmap.wrapper, - } - } + traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.replicator = replicator.New( - replicator.WithLogger(c.log), - replicator.WithPutTimeout( - replicatorconfig.PutTimeout(c.appCfg), - ), - replicator.WithLocalStorage(ls), - replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)), - ), + sPut := createPutSvc(c, keyStorage) + + sPutV2 := createPutSvcV2(sPut, keyStorage) + + sSearch := createSearchSvc(c, keyStorage, traverseGen, coreConstructor) + + sSearchV2 := createSearchSvcV2(sSearch, keyStorage) + + sGet := createGetService(c, keyStorage, traverseGen, coreConstructor) + + *c.cfgObject.getSvc = *sGet // need smth better + + sGetV2 := createGetServiceV2(sGet, keyStorage) + + sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut) + + sDeleteV2 := createDeleteServiceV2(sDelete) + + // build service pipeline + // grpc | | signature | response | acl | split + + splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2) + + aclSvc := createACLServiceV2(c, splitSvc) + + var commonSvc objectService.Common + commonSvc.Init(&c.internals, aclSvc) + + respSvc := objectService.NewResponseService( + &commonSvc, + c.respSvc, ) + signSvc := objectService.NewSignService( + &c.key.PrivateKey, + respSvc, + ) + + c.shared.metricsSvc = objectService.NewMetricCollector( + signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg)) + server := objectTransportGRPC.New(c.shared.metricsSvc) + + for _, srv := range c.cfgGRPC.servers { + objectGRPC.RegisterObjectServiceServer(srv, server) + } +} + +func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) { + ls := c.cfgObject.cfgLocalStorage.localStorage + pol := policer.New( policer.WithLogger(c.log), policer.WithLocalStorage(ls), @@ -246,13 +269,41 @@ func initObjectService(c *cfg) { policer.WithNodeLoader(c), ) - traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, worker{ fn: func(ctx context.Context) { pol.Run(ctx) }, }) +} + +func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher { + if c.cfgMorph.client.ProbeNotary() { + return &innerRingFetcherWithNotary{ + sidechain: c.cfgMorph.client, + } + } + return &innerRingFetcherWithoutNotary{ + nm: c.cfgNetmap.wrapper, + } +} + +func createReplicator(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) *replicator.Replicator { + ls := c.cfgObject.cfgLocalStorage.localStorage + + return replicator.New( + replicator.WithLogger(c.log), + replicator.WithPutTimeout( + replicatorconfig.PutTimeout(c.appCfg), + ), + replicator.WithLocalStorage(ls), + replicator.WithRemoteSender( + putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)), + ), + ) +} + +func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service { + ls := c.cfgObject.cfgLocalStorage.localStorage var os putsvc.ObjectStorage = engineWithoutNotifications{ engine: ls, @@ -267,7 +318,15 @@ func initObjectService(c *cfg) { } } - sPut := putsvc.NewService( + putConstructor := &coreClientConstructor{ + log: c.log, + nmSrc: c.netMapSource, + netState: c.cfgNetmap.state, + trustStorage: c.cfgReputation.localTrustStorage, + basicConstructor: c.putClientCache, + } + + return putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), @@ -279,13 +338,19 @@ func initObjectService(c *cfg) { putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), ) +} - sPutV2 := putsvcV2.NewService( +func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service { + return putsvcV2.NewService( putsvcV2.WithInternalService(sPut), putsvcV2.WithKeyStorage(keyStorage), ) +} - sSearch := searchsvc.New( +func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *coreClientConstructor) *searchsvc.Service { + ls := c.cfgObject.cfgLocalStorage.localStorage + + return searchsvc.New( searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), searchsvc.WithClientConstructor(coreConstructor), @@ -297,13 +362,20 @@ func initObjectService(c *cfg) { searchsvc.WithNetMapSource(c.netMapSource), searchsvc.WithKeyStorage(keyStorage), ) +} - sSearchV2 := searchsvcV2.NewService( +func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service { + return searchsvcV2.NewService( searchsvcV2.WithInternalService(sSearch), searchsvcV2.WithKeyStorage(keyStorage), ) +} - sGet := getsvc.New( +func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, + coreConstructor *coreClientConstructor) *getsvc.Service { + ls := c.cfgObject.cfgLocalStorage.localStorage + + return getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientConstructor(coreConstructor), @@ -315,15 +387,18 @@ func initObjectService(c *cfg) { getsvc.WithNetMapSource(c.netMapSource), getsvc.WithKeyStorage(keyStorage), ) +} - *c.cfgObject.getSvc = *sGet // need smth better - - sGetV2 := getsvcV2.NewService( +func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service { + return getsvcV2.NewService( getsvcV2.WithInternalService(sGet), getsvcV2.WithKeyStorage(keyStorage), ) +} - sDelete := deletesvc.New( +func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service, + sSearch *searchsvc.Service, sPut *putsvc.Service) *deletesvc.Service { + return deletesvc.New( deletesvc.WithLogger(c.log), deletesvc.WithHeadService(sGet), deletesvc.WithSearchService(sSearch), @@ -336,15 +411,17 @@ func initObjectService(c *cfg) { }), deletesvc.WithKeyStorage(keyStorage), ) +} - sDeleteV2 := deletesvcV2.NewService( +func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service { + return deletesvcV2.NewService( deletesvcV2.WithInternalService(sDelete), ) +} - // build service pipeline - // grpc | | signature | response | acl | split - - splitSvc := objectService.NewTransportSplitter( +func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service, + sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service) *objectService.TransportSplitter { + return objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, c.cfgGRPC.maxAddrAmount, &objectSvc{ @@ -354,8 +431,13 @@ func initObjectService(c *cfg) { delete: sDeleteV2, }, ) +} - aclSvc := v2.New( +func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter) v2.Service { + ls := c.cfgObject.cfgLocalStorage.localStorage + irFetcher := createInnerRingFetcher(c) + + return v2.New( v2.WithLogger(c.log), v2.WithIRFetcher(newCachedIRFetcher(irFetcher)), v2.WithNetmapSource(c.netMapSource), @@ -372,27 +454,6 @@ func initObjectService(c *cfg) { ), ), ) - - var commonSvc objectService.Common - commonSvc.Init(&c.internals, aclSvc) - - respSvc := objectService.NewResponseService( - &commonSvc, - c.respSvc, - ) - - signSvc := objectService.NewSignService( - &c.key.PrivateKey, - respSvc, - ) - - c.shared.metricsSvc = objectService.NewMetricCollector( - signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg)) - server := objectTransportGRPC.New(c.shared.metricsSvc) - - for _, srv := range c.cfgGRPC.servers { - objectGRPC.RegisterObjectServiceServer(srv, server) - } } type morphEACLFetcher struct {