forked from TrueCloudLab/frostfs-node
[#108] object/head: Export remote header retrieval utility
Export remote head functionality in headsvc package. Refactor head service to use RemoteHeader. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
5ad0df7794
commit
5017ff0e4a
3 changed files with 65 additions and 35 deletions
|
@ -5,7 +5,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
@ -105,25 +104,24 @@ loop:
|
||||||
if err := h.workerPool.Submit(func() {
|
if err := h.workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
var header interface {
|
|
||||||
head(context.Context, *Prm, func(*object.Object)) error
|
|
||||||
}
|
|
||||||
|
|
||||||
if network.IsLocalAddress(h.localAddrSrc, addr) {
|
if network.IsLocalAddress(h.localAddrSrc, addr) {
|
||||||
header = &localHeader{
|
if err := h.localHeader.head(ctx, prm, h.w.write); err != nil {
|
||||||
storage: h.localStore,
|
// TODO: log error
|
||||||
}
|
|
||||||
} else {
|
|
||||||
header = &remoteHeader{
|
|
||||||
keyStorage: h.keyStorage,
|
|
||||||
node: addr,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := header.head(ctx, prm, h.w.write); err != nil {
|
head, err := h.remoteHeader.Head(ctx, &RemoteHeadPrm{
|
||||||
|
commonHeadPrm: prm,
|
||||||
|
node: addr,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.w.write(head)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
|
|
|
@ -4,53 +4,85 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type remoteHeader struct {
|
// RemoteHeader represents utility for getting
|
||||||
|
// the object header from a remote host.
|
||||||
|
type RemoteHeader struct {
|
||||||
keyStorage *util.KeyStorage
|
keyStorage *util.KeyStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteHeadPrm groups remote header operation parameters.
|
||||||
|
type RemoteHeadPrm struct {
|
||||||
|
commonHeadPrm *Prm
|
||||||
|
|
||||||
node *network.Address
|
node *network.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *remoteHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error {
|
// NewRemoteHeader creates, initializes and returns new RemoteHeader instance.
|
||||||
key, err := h.keyStorage.GetKey(prm.common.SessionToken())
|
func NewRemoteHeader(keyStorage *util.KeyStorage) *RemoteHeader {
|
||||||
if err != nil {
|
return &RemoteHeader{
|
||||||
return errors.Wrapf(err, "(%T) could not receive private key", h)
|
keyStorage: keyStorage,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithNodeAddress sets network address of the remote node.
|
||||||
|
func (p *RemoteHeadPrm) WithNodeAddress(v *network.Address) *RemoteHeadPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.node = v
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := h.node.IPAddrString()
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithObjectAddress sets object address.
|
||||||
|
func (p *RemoteHeadPrm) WithObjectAddress(v *objectSDK.Address) *RemoteHeadPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.commonHeadPrm = new(Prm).WithAddress(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head requests object header from the remote node.
|
||||||
|
func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Object, error) {
|
||||||
|
key, err := h.keyStorage.GetKey(prm.commonHeadPrm.common.SessionToken())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, errors.Wrapf(err, "(%T) could not receive private key", h)
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, err := prm.node.IPAddrString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := client.New(key,
|
c, err := client.New(key,
|
||||||
client.WithAddress(addr),
|
client.WithAddress(addr),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
|
return nil, errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
p := new(client.ObjectHeaderParams).
|
p := new(client.ObjectHeaderParams).
|
||||||
WithAddress(prm.addr)
|
WithAddress(prm.commonHeadPrm.addr)
|
||||||
|
|
||||||
if prm.short {
|
if prm.commonHeadPrm.short {
|
||||||
p = p.WithMainFields()
|
p = p.WithMainFields()
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr, err := c.GetObjectHeader(ctx, p,
|
hdr, err := c.GetObjectHeader(ctx, p,
|
||||||
client.WithTTL(1), // FIXME: use constant
|
client.WithTTL(1), // FIXME: use constant
|
||||||
client.WithSession(prm.common.SessionToken()),
|
client.WithSession(prm.commonHeadPrm.common.SessionToken()),
|
||||||
client.WithBearer(prm.common.BearerToken()),
|
client.WithBearer(prm.commonHeadPrm.common.BearerToken()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "(%T) could not head object in %s", h, addr)
|
return nil, errors.Wrapf(err, "(%T) could not head object in %s", h, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler(object.NewFromSDK(hdr))
|
return object.NewFromSDK(hdr), nil
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,10 +24,6 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
keyStorage *objutil.KeyStorage
|
|
||||||
|
|
||||||
localStore *localstore.Storage
|
|
||||||
|
|
||||||
cnrSrc container.Source
|
cnrSrc container.Source
|
||||||
|
|
||||||
netMapSrc netmap.Source
|
netMapSrc netmap.Source
|
||||||
|
@ -37,6 +33,10 @@ type cfg struct {
|
||||||
localAddrSrc network.LocalAddressSource
|
localAddrSrc network.LocalAddressSource
|
||||||
|
|
||||||
rightChildSearcher RelationSearcher
|
rightChildSearcher RelationSearcher
|
||||||
|
|
||||||
|
localHeader localHeader
|
||||||
|
|
||||||
|
remoteHeader RemoteHeader
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
@ -95,13 +95,13 @@ func (s *Service) Head(ctx context.Context, prm *Prm) (*Response, error) {
|
||||||
|
|
||||||
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.keyStorage = v
|
c.remoteHeader.keyStorage = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithLocalStorage(v *localstore.Storage) Option {
|
func WithLocalStorage(v *localstore.Storage) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStore = v
|
c.localHeader.storage = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue