forked from TrueCloudLab/frostfs-node
[#184] Use SDK client cache in object.Put
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
e9a6365333
commit
7ba95dd5fc
4 changed files with 30 additions and 15 deletions
|
@ -200,7 +200,7 @@ func initObjectService(c *cfg) {
|
|||
),
|
||||
replicator.WithLocalStorage(ls),
|
||||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage),
|
||||
putsvc.NewRemoteSender(keyStorage, clientCache),
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -246,6 +246,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sPut := putsvc.NewService(
|
||||
putsvc.WithKeyStorage(keyStorage),
|
||||
putsvc.WithClientCache(clientCache),
|
||||
putsvc.WithMaxSizeSource(c),
|
||||
putsvc.WithLocalStorage(ls),
|
||||
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -26,12 +27,15 @@ type remoteTarget struct {
|
|||
addr *network.Address
|
||||
|
||||
obj *object.Object
|
||||
|
||||
clientCache *cache.ClientCache
|
||||
}
|
||||
|
||||
// RemoteSender represents utility for
|
||||
// sending an object to a remote host.
|
||||
type RemoteSender struct {
|
||||
keyStorage *util.KeyStorage
|
||||
keyStorage *util.KeyStorage
|
||||
clientCache *cache.ClientCache
|
||||
}
|
||||
|
||||
// RemotePutPrm groups remote put operation parameters.
|
||||
|
@ -58,9 +62,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
c, err := client.New(key,
|
||||
client.WithAddress(addr),
|
||||
)
|
||||
c, err := t.clientCache.Get(key, addr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr)
|
||||
}
|
||||
|
@ -81,9 +83,10 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
}
|
||||
|
||||
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
|
||||
func NewRemoteSender(keyStorage *util.KeyStorage) *RemoteSender {
|
||||
func NewRemoteSender(keyStorage *util.KeyStorage, cache *cache.ClientCache) *RemoteSender {
|
||||
return &RemoteSender{
|
||||
keyStorage: keyStorage,
|
||||
keyStorage: keyStorage,
|
||||
clientCache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,9 +111,10 @@ func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
|
|||
// PutObject sends object to remote node.
|
||||
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
||||
t := &remoteTarget{
|
||||
ctx: ctx,
|
||||
keyStorage: s.keyStorage,
|
||||
addr: p.node,
|
||||
ctx: ctx,
|
||||
keyStorage: s.keyStorage,
|
||||
addr: p.node,
|
||||
clientCache: s.clientCache,
|
||||
}
|
||||
|
||||
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
)
|
||||
|
@ -46,6 +47,8 @@ type cfg struct {
|
|||
fmtValidatorOpts []object.FormatValidatorOption
|
||||
|
||||
networkState netmap.State
|
||||
|
||||
clientCache *cache.ClientCache
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -128,3 +131,9 @@ func WithNetworkState(v netmap.State) Option {
|
|||
c.networkState = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientCache(v *cache.ClientCache) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientCache = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,11 +138,12 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
|||
}
|
||||
} else {
|
||||
return &remoteTarget{
|
||||
ctx: p.ctx,
|
||||
keyStorage: p.keyStorage,
|
||||
token: prm.common.SessionToken(),
|
||||
bearer: prm.common.BearerToken(),
|
||||
addr: addr,
|
||||
ctx: p.ctx,
|
||||
keyStorage: p.keyStorage,
|
||||
token: prm.common.SessionToken(),
|
||||
bearer: prm.common.BearerToken(),
|
||||
addr: addr,
|
||||
clientCache: p.clientCache,
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
Loading…
Reference in a new issue