[#422] pkg/services: Provide client options on cache creation

Because options are not used when client is already in cache
providing them to shared cache is misleading at best.
In the worst case `dial_timeout` is set randomly (because of race
condition) which can lead to one service having `dial_timeout` of
another. Thus we set default client creation options when cache is
created.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2021-03-13 18:22:24 +03:00 committed by Leonard Lyubich
parent cc7287d6f7
commit 6679d59e89
13 changed files with 22 additions and 79 deletions

View file

@ -67,6 +67,7 @@ const (
cfgListenAddress = "grpc.endpoint" cfgListenAddress = "grpc.endpoint"
cfgMaxMsgSize = "grpc.maxmessagesize" cfgMaxMsgSize = "grpc.maxmessagesize"
cfgReflectService = "grpc.enable_reflect_service" cfgReflectService = "grpc.enable_reflect_service"
cfgDialTimeout = "grpc.dial_timeout"
// config keys for cfgMorph // config keys for cfgMorph
cfgMorphRPCAddress = "morph.rpc_endpoint" cfgMorphRPCAddress = "morph.rpc_endpoint"
@ -93,10 +94,8 @@ const (
cfgPolicerWorkScope = "policer.work_scope" cfgPolicerWorkScope = "policer.work_scope"
cfgPolicerExpRate = "policer.expansion_rate" cfgPolicerExpRate = "policer.expansion_rate"
cfgPolicerHeadTimeout = "policer.head_timeout" cfgPolicerHeadTimeout = "policer.head_timeout"
cfgPolicerDialTimeout = "policer.dial_timeout"
cfgReplicatorPutTimeout = "replicator.put_timeout" cfgReplicatorPutTimeout = "replicator.put_timeout"
cfgReplicatorDialTimeout = "replicator.dial_timeout"
cfgReBootstrapEnabled = "bootstrap.periodic.enabled" cfgReBootstrapEnabled = "bootstrap.periodic.enabled"
cfgReBootstrapInterval = "bootstrap.periodic.interval" cfgReBootstrapInterval = "bootstrap.periodic.interval"
@ -107,13 +106,6 @@ const (
cfgObjectSearchPoolSize = "pool.object.search.size" cfgObjectSearchPoolSize = "pool.object.search.size"
cfgObjectRangePoolSize = "pool.object.range.size" cfgObjectRangePoolSize = "pool.object.range.size"
cfgObjectRangeHashPoolSize = "pool.object.rangehash.size" cfgObjectRangeHashPoolSize = "pool.object.rangehash.size"
cfgObjectPutDialTimeout = "object.put.dial_timeout"
cfgObjectHeadDialTimeout = "object.head.dial_timeout"
cfgObjectRangeDialTimeout = "object.range.dial_timeout"
cfgObjectRangeHashDialTimeout = "object.rangehash.dial_timeout"
cfgObjectSearchDialTimeout = "object.search.dial_timeout"
cfgObjectGetDialTimeout = "object.get.dial_timeout"
) )
const ( const (
@ -401,6 +393,7 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address
v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB v.SetDefault(cfgMaxMsgSize, 4<<20) // transport msg limit 4 MiB
v.SetDefault(cfgReflectService, false) v.SetDefault(cfgReflectService, false)
v.SetDefault(cfgDialTimeout, 5*time.Second)
v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1") v.SetDefault(cfgAccountingContract, "1aeefe1d0dfade49740fff779c02cd4a0538ffb1")
v.SetDefault(cfgAccountingFee, "1") v.SetDefault(cfgAccountingFee, "1")
@ -428,10 +421,8 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgPolicerWorkScope, 100) v.SetDefault(cfgPolicerWorkScope, 100)
v.SetDefault(cfgPolicerExpRate, 10) // in % v.SetDefault(cfgPolicerExpRate, 10) // in %
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second) v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
v.SetDefault(cfgPolicerDialTimeout, 5*time.Second)
v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second) v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second)
v.SetDefault(cfgReplicatorDialTimeout, 5*time.Second)
v.SetDefault(cfgReBootstrapEnabled, false) // in epochs v.SetDefault(cfgReBootstrapEnabled, false) // in epochs
v.SetDefault(cfgReBootstrapInterval, 2) // in epochs v.SetDefault(cfgReBootstrapInterval, 2) // in epochs

View file

@ -144,7 +144,8 @@ func initObjectService(c *cfg) {
nodeOwner.SetNeo3Wallet(neo3Wallet) nodeOwner.SetNeo3Wallet(neo3Wallet)
clientCache := cache.NewSDKClientCache() clientCache := cache.NewSDKClientCache(
client.WithDialTimeout(c.viper.GetDuration(cfgDialTimeout)))
objRemover := &localObjectRemover{ objRemover := &localObjectRemover{
storage: ls, storage: ls,
@ -173,9 +174,7 @@ func initObjectService(c *cfg) {
), ),
replicator.WithLocalStorage(ls), replicator.WithLocalStorage(ls),
replicator.WithRemoteSender( replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, clientCache, putsvc.NewRemoteSender(keyStorage, clientCache),
client.WithDialTimeout(c.viper.GetDuration(cfgReplicatorDialTimeout)),
),
), ),
) )
@ -198,9 +197,7 @@ func initObjectService(c *cfg) {
), ),
policer.WithTrigger(ch), policer.WithTrigger(ch),
policer.WithRemoteHeader( policer.WithRemoteHeader(
headsvc.NewRemoteHeader(keyStorage, clientCache, headsvc.NewRemoteHeader(keyStorage, clientCache),
client.WithDialTimeout(c.viper.GetDuration(cfgPolicerDialTimeout)),
),
), ),
policer.WithLocalAddressSource(c), policer.WithLocalAddressSource(c),
policer.WithHeadTimeout( policer.WithHeadTimeout(
@ -245,9 +242,6 @@ func initObjectService(c *cfg) {
putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPool(c.cfgObject.pool.put), putsvc.WithWorkerPool(c.cfgObject.pool.put),
putsvc.WithLogger(c.log), putsvc.WithLogger(c.log),
putsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectPutDialTimeout)),
),
) )
sPutV2 := putsvcV2.NewService( sPutV2 := putsvcV2.NewService(
@ -258,9 +252,6 @@ func initObjectService(c *cfg) {
searchsvc.WithLogger(c.log), searchsvc.WithLogger(c.log),
searchsvc.WithLocalStorageEngine(ls), searchsvc.WithLocalStorageEngine(ls),
searchsvc.WithClientCache(clientCache), searchsvc.WithClientCache(clientCache),
searchsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectSearchDialTimeout)),
),
searchsvc.WithTraverserGenerator( searchsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
@ -278,9 +269,6 @@ func initObjectService(c *cfg) {
getsvc.WithLogger(c.log), getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls), getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientCache(clientCache), getsvc.WithClientCache(clientCache),
getsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)),
),
getsvc.WithTraverserGenerator( getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),

View file

@ -27,7 +27,7 @@ type (
// NeoFSClientCache is an interface for cache of neofs RPC clients // NeoFSClientCache is an interface for cache of neofs RPC clients
NeoFSClientCache interface { NeoFSClientCache interface {
Get(address string, opts ...SDKClient.Option) (*SDKClient.Client, error) Get(address string) (*SDKClient.Client, error)
} }
TaskManager interface { TaskManager interface {

View file

@ -46,10 +46,10 @@ func newClientCache(p *clientCacheParams) *ClientCache {
} }
} }
func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { func (c *ClientCache) Get(address string) (*client.Client, error) {
// Because cache is used by `ClientCache` exclusively, // Because cache is used by `ClientCache` exclusively,
// client will always have valid key. // client will always have valid key.
return c.cache.Get(address, opts...) return c.cache.Get(address)
} }
// GetSG polls the container from audit task to get the object by id. // GetSG polls the container from audit task to get the object by id.

View file

@ -12,19 +12,22 @@ type (
ClientCache struct { ClientCache struct {
mu *sync.RWMutex mu *sync.RWMutex
clients map[string]*client.Client clients map[string]*client.Client
opts []client.Option
} }
) )
// NewSDKClientCache creates instance of client cache. // NewSDKClientCache creates instance of client cache.
func NewSDKClientCache() *ClientCache { // `opts` are used for new client creation.
func NewSDKClientCache(opts ...client.Option) *ClientCache {
return &ClientCache{ return &ClientCache{
mu: new(sync.RWMutex), mu: new(sync.RWMutex),
clients: make(map[string]*client.Client), clients: make(map[string]*client.Client),
opts: opts,
} }
} }
// Get function returns existing client or creates a new one. // Get function returns existing client or creates a new one.
func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client, error) { func (c *ClientCache) Get(address string) (*client.Client, error) {
c.mu.RLock() c.mu.RLock()
if cli, ok := c.clients[address]; ok { if cli, ok := c.clients[address]; ok {
// todo: check underlying connection neofs-api-go#196 // todo: check underlying connection neofs-api-go#196
@ -44,7 +47,7 @@ func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client
return cli, nil return cli, nil
} }
cli, err := client.New(nil, append(opts, client.WithAddress(address))...) cli, err := client.New(nil, append(c.opts, client.WithAddress(address))...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1,7 +1,6 @@
package getsvc package getsvc
import ( import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
@ -99,13 +98,6 @@ func WithClientCache(v *cache.ClientCache) Option {
} }
} }
// WithClientOptions returns option to specify options of remote node clients.
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).opts = opts
}
}
// WithTraverserGenerator returns option to set generator of // WithTraverserGenerator returns option to set generator of
// placement traverser to get the objects from containers. // placement traverser to get the objects from containers.
func WithTraverserGenerator(t *util.TraverserGenerator) Option { func WithTraverserGenerator(t *util.TraverserGenerator) Option {

View file

@ -19,8 +19,6 @@ type SimpleObjectWriter struct {
type clientCacheWrapper struct { type clientCacheWrapper struct {
cache *cache.ClientCache cache *cache.ClientCache
opts []client.Option
} }
type clientWrapper struct { type clientWrapper struct {
@ -75,7 +73,7 @@ func (s *SimpleObjectWriter) Object() *object.Object {
} }
func (c *clientCacheWrapper) get(addr string) (getClient, error) { func (c *clientCacheWrapper) get(addr string) (getClient, error) {
clt, err := c.cache.Get(addr, c.opts...) clt, err := c.cache.Get(addr)
return &clientWrapper{ return &clientWrapper{
client: clt, client: clt,

View file

@ -18,8 +18,6 @@ type RemoteHeader struct {
keyStorage *util.KeyStorage keyStorage *util.KeyStorage
clientCache *cache.ClientCache clientCache *cache.ClientCache
clientOpts []client.Option
} }
// RemoteHeadPrm groups remote header operation parameters. // RemoteHeadPrm groups remote header operation parameters.
@ -32,11 +30,10 @@ type RemoteHeadPrm struct {
var ErrNotFound = errors.New("object header not found") var ErrNotFound = errors.New("object header not found")
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance. // NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteHeader { func NewRemoteHeader(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteHeader {
return &RemoteHeader{ return &RemoteHeader{
keyStorage: keyStorage, keyStorage: keyStorage,
clientCache: cache, clientCache: cache,
clientOpts: opts,
} }
} }
@ -70,7 +67,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
return nil, err return nil, err
} }
c, err := h.clientCache.Get(addr, h.clientOpts...) c, err := h.clientCache.Get(addr)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
} }

View file

@ -26,8 +26,6 @@ type remoteTarget struct {
obj *object.Object obj *object.Object
clientCache *cache.ClientCache clientCache *cache.ClientCache
clientOpts []client.Option
} }
// RemoteSender represents utility for // RemoteSender represents utility for
@ -36,8 +34,6 @@ type RemoteSender struct {
keyStorage *util.KeyStorage keyStorage *util.KeyStorage
clientCache *cache.ClientCache clientCache *cache.ClientCache
clientOpts []client.Option
} }
// RemotePutPrm groups remote put operation parameters. // RemotePutPrm groups remote put operation parameters.
@ -64,7 +60,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
return nil, err return nil, err
} }
c, err := t.clientCache.Get(addr, t.clientOpts...) c, err := t.clientCache.Get(addr)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr)
} }
@ -88,11 +84,10 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
} }
// NewRemoteSender creates, initializes and returns new RemoteSender instance. // NewRemoteSender creates, initializes and returns new RemoteSender instance.
func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache, opts ...client.Option) *RemoteSender { func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteSender {
return &RemoteSender{ return &RemoteSender{
keyStorage: keyStorage, keyStorage: keyStorage,
clientCache: cache, clientCache: cache,
clientOpts: opts,
} }
} }
@ -121,7 +116,6 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
keyStorage: s.keyStorage, keyStorage: s.keyStorage,
addr: p.node, addr: p.node,
clientCache: s.clientCache, clientCache: s.clientCache,
clientOpts: s.clientOpts,
} }
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil { if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {

View file

@ -3,7 +3,6 @@ package putsvc
import ( import (
"context" "context"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
@ -54,8 +53,6 @@ type cfg struct {
clientCache *cache.ClientCache clientCache *cache.ClientCache
log *logger.Logger log *logger.Logger
clientOpts []client.Option
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -152,9 +149,3 @@ func WithLogger(l *logger.Logger) Option {
c.log = l c.log = l
} }
} }
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientOpts = opts
}
}

View file

@ -144,7 +144,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
commonPrm: prm.common, commonPrm: prm.common,
addr: addr, addr: addr,
clientCache: p.clientCache, clientCache: p.clientCache,
clientOpts: p.clientOpts,
} }
}, },
fmt: p.fmtValidator, fmt: p.fmtValidator,

View file

@ -1,7 +1,6 @@
package searchsvc package searchsvc
import ( import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
@ -89,13 +88,6 @@ func WithClientCache(v *cache.ClientCache) Option {
} }
} }
// WithClientOptions returns option to specify options of remote node clients.
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).opts = opts
}
}
// WithTraverserGenerator returns option to set generator of // WithTraverserGenerator returns option to set generator of
// placement traverser to get the objects from containers. // placement traverser to get the objects from containers.
func WithTraverserGenerator(t *util.TraverserGenerator) Option { func WithTraverserGenerator(t *util.TraverserGenerator) Option {

View file

@ -23,8 +23,6 @@ type uniqueIDWriter struct {
type clientCacheWrapper struct { type clientCacheWrapper struct {
cache *cache.ClientCache cache *cache.ClientCache
opts []client.Option
} }
type clientWrapper struct { type clientWrapper struct {
@ -71,7 +69,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
} }
func (c *clientCacheWrapper) get(addr string) (searchClient, error) { func (c *clientCacheWrapper) get(addr string) (searchClient, error) {
clt, err := c.cache.Get(addr, c.opts...) clt, err := c.cache.Get(addr)
return &clientWrapper{ return &clientWrapper{
client: clt, client: clt,