frostfs-node/pkg/services/object/put/remote.go
Leonard Lyubich 106884fc40 [#428] client: Hide client cache behind interface in dependent packages
Replace usage of `cache.ClientCache` type with interface with similar
signature. This will further allow overloading clients without affecting the
logic of dependent packages.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-03-24 10:11:52 +03:00

127 lines
2.9 KiB
Go

package putsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
type remoteTarget struct {
transformer.ObjectTarget
ctx context.Context
keyStorage *util.KeyStorage
commonPrm *util.CommonPrm
addr *network.Address
obj *object.Object
clientConstructor ClientConstructor
}
// RemoteSender represents utility for
// sending an object to a remote host.
type RemoteSender struct {
keyStorage *util.KeyStorage
clientConstructor ClientConstructor
}
// RemotePutPrm groups remote put operation parameters.
type RemotePutPrm struct {
node *network.Address
obj *object.Object
}
func (t *remoteTarget) WriteHeader(obj *object.RawObject) error {
t.obj = obj.Object()
return nil
}
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
key, err := t.keyStorage.GetKey(t.commonPrm.SessionToken())
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive private key", t)
}
addr, err := t.addr.IPAddrString()
if err != nil {
return nil, err
}
c, err := t.clientConstructor.Get(addr)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", t, addr)
}
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
WithObject(
t.obj.SDK(),
),
append(
t.commonPrm.RemoteCallOptions(),
client.WithTTL(1), // FIXME: use constant
client.WithKey(key),
)...,
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not put object to %s", t, addr)
}
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
}
// NewRemoteSender creates, initializes and returns new RemoteSender instance.
func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *RemoteSender {
return &RemoteSender{
keyStorage: keyStorage,
clientConstructor: cons,
}
}
// WithNodeAddress sets network address of the remote node.
func (p *RemotePutPrm) WithNodeAddress(v *network.Address) *RemotePutPrm {
if p != nil {
p.node = v
}
return p
}
// WithObject sets transferred object.
func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
if p != nil {
p.obj = v
}
return p
}
// PutObject sends object to remote node.
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
t := &remoteTarget{
ctx: ctx,
keyStorage: s.keyStorage,
addr: p.node,
clientConstructor: s.clientConstructor,
}
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {
return errors.Wrapf(err, "(%T) could not send object header", s)
} else if _, err := t.Close(); err != nil {
return errors.Wrapf(err, "(%T) could not send object", s)
}
return nil
}