From 6679d59e89c849aa02e3a79bebded98ace49bc9f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sat, 13 Mar 2021 18:22:24 +0300 Subject: [PATCH] [#422] pkg/services: Provide client options on cache creation Because options are not used when client is already in cache providing them to shared cache is misleading at best. In the worst case `dial_timeout` is set randomly (because of race condition) which can lead to one service having `dial_timeout` of another. Thus we set default client creation options when cache is created. Signed-off-by: Evgenii Stratonikov --- cmd/neofs-node/config.go | 15 +++------------ cmd/neofs-node/object.go | 20 ++++---------------- pkg/innerring/processors/audit/processor.go | 2 +- pkg/innerring/rpc.go | 4 ++-- pkg/network/cache/client.go | 9 ++++++--- pkg/services/object/get/service.go | 8 -------- pkg/services/object/get/util.go | 4 +--- pkg/services/object/head/remote.go | 7 ++----- pkg/services/object/put/remote.go | 10 ++-------- pkg/services/object/put/service.go | 9 --------- pkg/services/object/put/streamer.go | 1 - pkg/services/object/search/service.go | 8 -------- pkg/services/object/search/util.go | 4 +--- 13 files changed, 22 insertions(+), 79 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 6bc05016..bef394da 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -67,6 +67,7 @@ const ( cfgListenAddress = "grpc.endpoint" cfgMaxMsgSize = "grpc.maxmessagesize" cfgReflectService = "grpc.enable_reflect_service" + cfgDialTimeout = "grpc.dial_timeout" // config keys for cfgMorph cfgMorphRPCAddress = "morph.rpc_endpoint" @@ -93,10 +94,8 @@ const ( cfgPolicerWorkScope = "policer.work_scope" cfgPolicerExpRate = "policer.expansion_rate" cfgPolicerHeadTimeout = "policer.head_timeout" - cfgPolicerDialTimeout = "policer.dial_timeout" - cfgReplicatorPutTimeout = "replicator.put_timeout" - cfgReplicatorDialTimeout = "replicator.dial_timeout" + cfgReplicatorPutTimeout = "replicator.put_timeout" cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapInterval = "bootstrap.periodic.interval" @@ -107,13 +106,6 @@ const ( cfgObjectSearchPoolSize = "pool.object.search.size" cfgObjectRangePoolSize = "pool.object.range.size" cfgObjectRangeHashPoolSize = "pool.object.rangehash.size" - - cfgObjectPutDialTimeout = "object.put.dial_timeout" - cfgObjectHeadDialTimeout = "object.head.dial_timeout" - cfgObjectRangeDialTimeout = "object.range.dial_timeout" - cfgObjectRangeHashDialTimeout = "object.rangehash.dial_timeout" - cfgObjectSearchDialTimeout = "object.search.dial_timeout" - cfgObjectGetDialTimeout = "object.get.dial_timeout" ) const ( @@ -401,6 +393,7 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB v.SetDefault(cfgReflectService, false) + v.SetDefault(cfgDialTimeout, 5*time.Second) v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1") v.SetDefault(cfgAccountingFee, "1") @@ -428,10 +421,8 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgPolicerWorkScope, 100) v.SetDefault(cfgPolicerExpRate, 10) // in % v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) - v.SetDefault(cfgPolicerDialTimeout, 5*time.Second) v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second) - v.SetDefault(cfgReplicatorDialTimeout, 5*time.Second) v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 5a4372e7..62978fb0 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -144,7 +144,8 @@ func initObjectService(c *cfg) { nodeOwner.SetNeo3Wallet(neo3Wallet) - clientCache := cache.NewSDKClientCache() + clientCache := cache.NewSDKClientCache( + client.WithDialTimeout(c.viper.GetDuration(cfgDialTimeout))) objRemover := &localObjectRemover{ storage: ls, @@ -173,9 +174,7 @@ func initObjectService(c *cfg) { ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, clientCache, - client.WithDialTimeout(c.viper.GetDuration(cfgReplicatorDialTimeout)), - ), + putsvc.NewRemoteSender(keyStorage, clientCache), ), ) @@ -198,9 +197,7 @@ func initObjectService(c *cfg) { ), policer.WithTrigger(ch), policer.WithRemoteHeader( - headsvc.NewRemoteHeader(keyStorage, clientCache, - client.WithDialTimeout(c.viper.GetDuration(cfgPolicerDialTimeout)), - ), + headsvc.NewRemoteHeader(keyStorage, clientCache), ), policer.WithLocalAddressSource(c), policer.WithHeadTimeout( @@ -245,9 +242,6 @@ func initObjectService(c *cfg) { putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPool(c.cfgObject.pool.put), putsvc.WithLogger(c.log), - putsvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectPutDialTimeout)), - ), ) sPutV2 := putsvcV2.NewService( @@ -258,9 +252,6 @@ func initObjectService(c *cfg) { searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), searchsvc.WithClientCache(clientCache), - searchsvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectSearchDialTimeout)), - ), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), @@ -278,9 +269,6 @@ func initObjectService(c *cfg) { getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientCache(clientCache), - getsvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)), - ), getsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.SuccessAfter(1), diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 83d6c27e..4fc8e476 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -27,7 +27,7 @@ type ( // NeoFSClientCache is an interface for cache of neofs RPC clients NeoFSClientCache interface { - Get(address string, opts ...SDKClient.Option) (*SDKClient.Client, error) + Get(address string) (*SDKClient.Client, error) } TaskManager interface { diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index ade8452f..2986fe05 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -46,10 +46,10 @@ func newClientCache(p *clientCacheParams) *ClientCache { } } -func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { +func (c *ClientCache) Get(address string) (*client.Client, error) { // Because cache is used by `ClientCache` exclusively, // client will always have valid key. - return c.cache.Get(address, opts...) + return c.cache.Get(address) } // GetSG polls the container from audit task to get the object by id. diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 968faa41..18a7ac26 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -12,19 +12,22 @@ type ( ClientCache struct { mu *sync.RWMutex clients map[string]*client.Client + opts []client.Option } ) // NewSDKClientCache creates instance of client cache. -func NewSDKClientCache() *ClientCache { +// `opts` are used for new client creation. +func NewSDKClientCache(opts ...client.Option) *ClientCache { return &ClientCache{ mu: new(sync.RWMutex), clients: make(map[string]*client.Client), + opts: opts, } } // Get function returns existing client or creates a new one. -func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { +func (c *ClientCache) Get(address string) (*client.Client, error) { c.mu.RLock() if cli, ok := c.clients[address]; ok { // todo: check underlying connection neofs-api-go#196 @@ -44,7 +47,7 @@ func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client return cli, nil } - cli, err := client.New(nil, append(opts, client.WithAddress(address))...) + cli, err := client.New(nil, append(c.opts, client.WithAddress(address))...) if err != nil { return nil, err } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index d67ae0d1..54c9e15d 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,7 +1,6 @@ package getsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -99,13 +98,6 @@ func WithClientCache(v *cache.ClientCache) Option { } } -// WithClientOptions returns option to specify options of remote node clients. -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.clientCache.(*clientCacheWrapper).opts = opts - } -} - // WithTraverserGenerator returns option to set generator of // placement traverser to get the objects from containers. func WithTraverserGenerator(t *util.TraverserGenerator) Option { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index db2c7022..ab87d1fa 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -19,8 +19,6 @@ type SimpleObjectWriter struct { type clientCacheWrapper struct { cache *cache.ClientCache - - opts []client.Option } type clientWrapper struct { @@ -75,7 +73,7 @@ func (s *SimpleObjectWriter) Object() *object.Object { } func (c *clientCacheWrapper) get(addr string) (getClient, error) { - clt, err := c.cache.Get(addr, c.opts...) + clt, err := c.cache.Get(addr) return &clientWrapper{ client: clt, diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index a54e3786..39f31be0 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -18,8 +18,6 @@ type RemoteHeader struct { keyStorage *util.KeyStorage clientCache *cache.ClientCache - - clientOpts []client.Option } // RemoteHeadPrm groups remote header operation parameters. @@ -32,11 +30,10 @@ type RemoteHeadPrm struct { var ErrNotFound = errors.New("object header not found") // NewRemoteHeader creates, initializes and returns new RemoteHeader instance. -func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteHeader { +func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteHeader { return &RemoteHeader{ keyStorage: keyStorage, clientCache: cache, - clientOpts: opts, } } @@ -70,7 +67,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return nil, err } - c, err := h.clientCache.Get(addr, h.clientOpts...) + c, err := h.clientCache.Get(addr) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) } diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 30d4be04..dc5a21ab 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -26,8 +26,6 @@ type remoteTarget struct { obj *object.Object clientCache *cache.ClientCache - - clientOpts []client.Option } // RemoteSender represents utility for @@ -36,8 +34,6 @@ type RemoteSender struct { keyStorage *util.KeyStorage clientCache *cache.ClientCache - - clientOpts []client.Option } // RemotePutPrm groups remote put operation parameters. @@ -64,7 +60,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, err } - c, err := t.clientCache.Get(addr, t.clientOpts...) + c, err := t.clientCache.Get(addr) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) } @@ -88,11 +84,10 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { } // NewRemoteSender creates, initializes and returns new RemoteSender instance. -func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteSender { +func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteSender { return &RemoteSender{ keyStorage: keyStorage, clientCache: cache, - clientOpts: opts, } } @@ -121,7 +116,6 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { keyStorage: s.keyStorage, addr: p.node, clientCache: s.clientCache, - clientOpts: s.clientOpts, } if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 207db1c2..186ffe1d 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -3,7 +3,6 @@ package putsvc import ( "context" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -54,8 +53,6 @@ type cfg struct { clientCache *cache.ClientCache log *logger.Logger - - clientOpts []client.Option } func defaultCfg() *cfg { @@ -152,9 +149,3 @@ func WithLogger(l *logger.Logger) Option { c.log = l } } - -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.clientOpts = opts - } -} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index abc58bcf..9e158a3d 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -144,7 +144,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { commonPrm: prm.common, addr: addr, clientCache: p.clientCache, - clientOpts: p.clientOpts, } }, fmt: p.fmtValidator, diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 046b7732..f8ad71b9 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,7 +1,6 @@ package searchsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -89,13 +88,6 @@ func WithClientCache(v *cache.ClientCache) Option { } } -// WithClientOptions returns option to specify options of remote node clients. -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.clientCache.(*clientCacheWrapper).opts = opts - } -} - // WithTraverserGenerator returns option to set generator of // placement traverser to get the objects from containers. func WithTraverserGenerator(t *util.TraverserGenerator) Option { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 3485d559..5380f7eb 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -23,8 +23,6 @@ type uniqueIDWriter struct { type clientCacheWrapper struct { cache *cache.ClientCache - - opts []client.Option } type clientWrapper struct { @@ -71,7 +69,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { } func (c *clientCacheWrapper) get(addr string) (searchClient, error) { - clt, err := c.cache.Get(addr, c.opts...) + clt, err := c.cache.Get(addr) return &clientWrapper{ client: clt,