forked from TrueCloudLab/frostfs-node
[#168] node: Refactor object service init
Resolve funlen linter for initObjectService function Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
56161d39b4
commit
dd572825c7
1 changed files with 127 additions and 66 deletions
|
@ -163,9 +163,7 @@ func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiA
|
||||||
return c.(coreclient.MultiAddressClient), nil
|
return c.(coreclient.MultiAddressClient), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen
|
|
||||||
func initObjectService(c *cfg) {
|
func initObjectService(c *cfg) {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
|
||||||
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state)
|
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state)
|
||||||
|
|
||||||
clientConstructor := &reputationClientConstructor{
|
clientConstructor := &reputationClientConstructor{
|
||||||
|
@ -184,37 +182,62 @@ func initObjectService(c *cfg) {
|
||||||
basicConstructor: c.clientCache,
|
basicConstructor: c.clientCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
putConstructor := &coreClientConstructor{
|
c.replicator = createReplicator(c, keyStorage, clientConstructor)
|
||||||
log: c.log,
|
|
||||||
nmSrc: c.netMapSource,
|
|
||||||
netState: c.cfgNetmap.state,
|
|
||||||
trustStorage: c.cfgReputation.localTrustStorage,
|
|
||||||
basicConstructor: c.putClientCache,
|
|
||||||
}
|
|
||||||
|
|
||||||
var irFetcher v2.InnerRingFetcher
|
addPolicer(c, keyStorage, clientConstructor)
|
||||||
|
|
||||||
if c.cfgMorph.client.ProbeNotary() {
|
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||||
irFetcher = &innerRingFetcherWithNotary{
|
|
||||||
sidechain: c.cfgMorph.client,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
irFetcher = &innerRingFetcherWithoutNotary{
|
|
||||||
nm: c.cfgNetmap.wrapper,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.replicator = replicator.New(
|
sPut := createPutSvc(c, keyStorage)
|
||||||
replicator.WithLogger(c.log),
|
|
||||||
replicator.WithPutTimeout(
|
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
||||||
replicatorconfig.PutTimeout(c.appCfg),
|
|
||||||
),
|
sSearch := createSearchSvc(c, keyStorage, traverseGen, coreConstructor)
|
||||||
replicator.WithLocalStorage(ls),
|
|
||||||
replicator.WithRemoteSender(
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)),
|
|
||||||
),
|
sGet := createGetService(c, keyStorage, traverseGen, coreConstructor)
|
||||||
|
|
||||||
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
|
||||||
|
sGetV2 := createGetServiceV2(sGet, keyStorage)
|
||||||
|
|
||||||
|
sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut)
|
||||||
|
|
||||||
|
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||||
|
|
||||||
|
// build service pipeline
|
||||||
|
// grpc | <metrics> | signature | response | acl | split
|
||||||
|
|
||||||
|
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
|
||||||
|
|
||||||
|
aclSvc := createACLServiceV2(c, splitSvc)
|
||||||
|
|
||||||
|
var commonSvc objectService.Common
|
||||||
|
commonSvc.Init(&c.internals, aclSvc)
|
||||||
|
|
||||||
|
respSvc := objectService.NewResponseService(
|
||||||
|
&commonSvc,
|
||||||
|
c.respSvc,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
signSvc := objectService.NewSignService(
|
||||||
|
&c.key.PrivateKey,
|
||||||
|
respSvc,
|
||||||
|
)
|
||||||
|
|
||||||
|
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||||
|
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
|
||||||
|
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||||
|
|
||||||
|
for _, srv := range c.cfgGRPC.servers {
|
||||||
|
objectGRPC.RegisterObjectServiceServer(srv, server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
pol := policer.New(
|
pol := policer.New(
|
||||||
policer.WithLogger(c.log),
|
policer.WithLogger(c.log),
|
||||||
policer.WithLocalStorage(ls),
|
policer.WithLocalStorage(ls),
|
||||||
|
@ -246,13 +269,41 @@ func initObjectService(c *cfg) {
|
||||||
policer.WithNodeLoader(c),
|
policer.WithNodeLoader(c),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
|
||||||
|
|
||||||
c.workers = append(c.workers, worker{
|
c.workers = append(c.workers, worker{
|
||||||
fn: func(ctx context.Context) {
|
fn: func(ctx context.Context) {
|
||||||
pol.Run(ctx)
|
pol.Run(ctx)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher {
|
||||||
|
if c.cfgMorph.client.ProbeNotary() {
|
||||||
|
return &innerRingFetcherWithNotary{
|
||||||
|
sidechain: c.cfgMorph.client,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &innerRingFetcherWithoutNotary{
|
||||||
|
nm: c.cfgNetmap.wrapper,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createReplicator(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) *replicator.Replicator {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
return replicator.New(
|
||||||
|
replicator.WithLogger(c.log),
|
||||||
|
replicator.WithPutTimeout(
|
||||||
|
replicatorconfig.PutTimeout(c.appCfg),
|
||||||
|
),
|
||||||
|
replicator.WithLocalStorage(ls),
|
||||||
|
replicator.WithRemoteSender(
|
||||||
|
putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
engine: ls,
|
engine: ls,
|
||||||
|
@ -267,7 +318,15 @@ func initObjectService(c *cfg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sPut := putsvc.NewService(
|
putConstructor := &coreClientConstructor{
|
||||||
|
log: c.log,
|
||||||
|
nmSrc: c.netMapSource,
|
||||||
|
netState: c.cfgNetmap.state,
|
||||||
|
trustStorage: c.cfgReputation.localTrustStorage,
|
||||||
|
basicConstructor: c.putClientCache,
|
||||||
|
}
|
||||||
|
|
||||||
|
return putsvc.NewService(
|
||||||
putsvc.WithKeyStorage(keyStorage),
|
putsvc.WithKeyStorage(keyStorage),
|
||||||
putsvc.WithClientConstructor(putConstructor),
|
putsvc.WithClientConstructor(putConstructor),
|
||||||
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
|
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
|
||||||
|
@ -279,13 +338,19 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||||
putsvc.WithLogger(c.log),
|
putsvc.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sPutV2 := putsvcV2.NewService(
|
func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service {
|
||||||
|
return putsvcV2.NewService(
|
||||||
putsvcV2.WithInternalService(sPut),
|
putsvcV2.WithInternalService(sPut),
|
||||||
putsvcV2.WithKeyStorage(keyStorage),
|
putsvcV2.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sSearch := searchsvc.New(
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *coreClientConstructor) *searchsvc.Service {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
return searchsvc.New(
|
||||||
searchsvc.WithLogger(c.log),
|
searchsvc.WithLogger(c.log),
|
||||||
searchsvc.WithLocalStorageEngine(ls),
|
searchsvc.WithLocalStorageEngine(ls),
|
||||||
searchsvc.WithClientConstructor(coreConstructor),
|
searchsvc.WithClientConstructor(coreConstructor),
|
||||||
|
@ -297,13 +362,20 @@ func initObjectService(c *cfg) {
|
||||||
searchsvc.WithNetMapSource(c.netMapSource),
|
searchsvc.WithNetMapSource(c.netMapSource),
|
||||||
searchsvc.WithKeyStorage(keyStorage),
|
searchsvc.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sSearchV2 := searchsvcV2.NewService(
|
func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service {
|
||||||
|
return searchsvcV2.NewService(
|
||||||
searchsvcV2.WithInternalService(sSearch),
|
searchsvcV2.WithInternalService(sSearch),
|
||||||
searchsvcV2.WithKeyStorage(keyStorage),
|
searchsvcV2.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sGet := getsvc.New(
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||||
|
coreConstructor *coreClientConstructor) *getsvc.Service {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
return getsvc.New(
|
||||||
getsvc.WithLogger(c.log),
|
getsvc.WithLogger(c.log),
|
||||||
getsvc.WithLocalStorageEngine(ls),
|
getsvc.WithLocalStorageEngine(ls),
|
||||||
getsvc.WithClientConstructor(coreConstructor),
|
getsvc.WithClientConstructor(coreConstructor),
|
||||||
|
@ -315,15 +387,18 @@ func initObjectService(c *cfg) {
|
||||||
getsvc.WithNetMapSource(c.netMapSource),
|
getsvc.WithNetMapSource(c.netMapSource),
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
getsvc.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||||
|
return getsvcV2.NewService(
|
||||||
sGetV2 := getsvcV2.NewService(
|
|
||||||
getsvcV2.WithInternalService(sGet),
|
getsvcV2.WithInternalService(sGet),
|
||||||
getsvcV2.WithKeyStorage(keyStorage),
|
getsvcV2.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sDelete := deletesvc.New(
|
func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service,
|
||||||
|
sSearch *searchsvc.Service, sPut *putsvc.Service) *deletesvc.Service {
|
||||||
|
return deletesvc.New(
|
||||||
deletesvc.WithLogger(c.log),
|
deletesvc.WithLogger(c.log),
|
||||||
deletesvc.WithHeadService(sGet),
|
deletesvc.WithHeadService(sGet),
|
||||||
deletesvc.WithSearchService(sSearch),
|
deletesvc.WithSearchService(sSearch),
|
||||||
|
@ -336,15 +411,17 @@ func initObjectService(c *cfg) {
|
||||||
}),
|
}),
|
||||||
deletesvc.WithKeyStorage(keyStorage),
|
deletesvc.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
sDeleteV2 := deletesvcV2.NewService(
|
func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
||||||
|
return deletesvcV2.NewService(
|
||||||
deletesvcV2.WithInternalService(sDelete),
|
deletesvcV2.WithInternalService(sDelete),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// build service pipeline
|
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
||||||
// grpc | <metrics> | signature | response | acl | split
|
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service) *objectService.TransportSplitter {
|
||||||
|
return objectService.NewTransportSplitter(
|
||||||
splitSvc := objectService.NewTransportSplitter(
|
|
||||||
c.cfgGRPC.maxChunkSize,
|
c.cfgGRPC.maxChunkSize,
|
||||||
c.cfgGRPC.maxAddrAmount,
|
c.cfgGRPC.maxAddrAmount,
|
||||||
&objectSvc{
|
&objectSvc{
|
||||||
|
@ -354,8 +431,13 @@ func initObjectService(c *cfg) {
|
||||||
delete: sDeleteV2,
|
delete: sDeleteV2,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
aclSvc := v2.New(
|
func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter) v2.Service {
|
||||||
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
irFetcher := createInnerRingFetcher(c)
|
||||||
|
|
||||||
|
return v2.New(
|
||||||
v2.WithLogger(c.log),
|
v2.WithLogger(c.log),
|
||||||
v2.WithIRFetcher(newCachedIRFetcher(irFetcher)),
|
v2.WithIRFetcher(newCachedIRFetcher(irFetcher)),
|
||||||
v2.WithNetmapSource(c.netMapSource),
|
v2.WithNetmapSource(c.netMapSource),
|
||||||
|
@ -372,27 +454,6 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
var commonSvc objectService.Common
|
|
||||||
commonSvc.Init(&c.internals, aclSvc)
|
|
||||||
|
|
||||||
respSvc := objectService.NewResponseService(
|
|
||||||
&commonSvc,
|
|
||||||
c.respSvc,
|
|
||||||
)
|
|
||||||
|
|
||||||
signSvc := objectService.NewSignService(
|
|
||||||
&c.key.PrivateKey,
|
|
||||||
respSvc,
|
|
||||||
)
|
|
||||||
|
|
||||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
|
||||||
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
|
|
||||||
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
|
||||||
|
|
||||||
for _, srv := range c.cfgGRPC.servers {
|
|
||||||
objectGRPC.RegisterObjectServiceServer(srv, server)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type morphEACLFetcher struct {
|
type morphEACLFetcher struct {
|
||||||
|
|
Loading…
Reference in a new issue