From 7ba95dd5fcb280736b0560efd8bcf51cdb60a40e Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 18 Nov 2020 16:03:31 +0300 Subject: [PATCH] [#184] Use SDK client cache in object.Put Signed-off-by: Alex Vanin --- cmd/neofs-node/object.go | 3 ++- pkg/services/object/put/remote.go | 22 +++++++++++++--------- pkg/services/object/put/service.go | 9 +++++++++ pkg/services/object/put/streamer.go | 11 ++++++----- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 70b12bfabe..d5020b3a34 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -200,7 +200,7 @@ func initObjectService(c *cfg) { ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage), + putsvc.NewRemoteSender(keyStorage, clientCache), ), ) @@ -246,6 +246,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), + putsvc.WithClientCache(clientCache), putsvc.WithMaxSizeSource(c), putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrStorage), diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 57f9d51a7d..88e70871e1 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" "github.com/pkg/errors" @@ -26,12 +27,15 @@ type remoteTarget struct { addr *network.Address obj *object.Object + + clientCache *cache.ClientCache } // RemoteSender represents utility for // sending an object to a remote host. type RemoteSender struct { - keyStorage *util.KeyStorage + keyStorage *util.KeyStorage + clientCache *cache.ClientCache } // RemotePutPrm groups remote put operation parameters. @@ -58,9 +62,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, err } - c, err := client.New(key, - client.WithAddress(addr), - ) + c, err := t.clientCache.Get(key, addr) if err != nil { return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr) } @@ -81,9 +83,10 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { } // NewRemoteSender creates, initializes and returns new RemoteSender instance. -func NewRemoteSender(keyStorage *util.KeyStorage) *RemoteSender { +func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteSender { return &RemoteSender{ - keyStorage: keyStorage, + keyStorage: keyStorage, + clientCache: cache, } } @@ -108,9 +111,10 @@ func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm { // PutObject sends object to remote node. func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { t := &remoteTarget{ - ctx: ctx, - keyStorage: s.keyStorage, - addr: p.node, + ctx: ctx, + keyStorage: s.keyStorage, + addr: p.node, + clientCache: s.clientCache, } if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 0659bac5f3..b238df5460 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/network/cache" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" ) @@ -46,6 +47,8 @@ type cfg struct { fmtValidatorOpts []object.FormatValidatorOption networkState netmap.State + + clientCache *cache.ClientCache } func defaultCfg() *cfg { @@ -128,3 +131,9 @@ func WithNetworkState(v netmap.State) Option { c.networkState = v } } + +func WithClientCache(v *cache.ClientCache) Option { + return func(c *cfg) { + c.clientCache = v + } +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index ab29331b2b..bc87116ba2 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -138,11 +138,12 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { } } else { return &remoteTarget{ - ctx: p.ctx, - keyStorage: p.keyStorage, - token: prm.common.SessionToken(), - bearer: prm.common.BearerToken(), - addr: addr, + ctx: p.ctx, + keyStorage: p.keyStorage, + token: prm.common.SessionToken(), + bearer: prm.common.BearerToken(), + addr: addr, + clientCache: p.clientCache, } } },