forked from TrueCloudLab/frostfs-node
[#431] services/object: Re-sign original read requests during forwarding
In previous implementation node's Object Get/Head/GetRange V2 service handlers created a new request for each RPC. Now original requests are re-signed according to API specification. Logical handler abstracts from this version-dependent logic through `RequestForwarder` callback. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
e6b30aed36
commit
36088949fc
6 changed files with 304 additions and 2 deletions
|
@ -12,6 +12,8 @@ func (exec *execCtx) assemble() {
|
|||
return
|
||||
}
|
||||
|
||||
exec.assembling = true
|
||||
|
||||
exec.log.Debug("trying to assemble the object...")
|
||||
|
||||
splitInfo := exec.splitInfo()
|
||||
|
|
|
@ -41,6 +41,12 @@ type execCtx struct {
|
|||
head bool
|
||||
|
||||
curProcEpoch uint64
|
||||
|
||||
// true when the processing of the initial request
|
||||
// is turned to assembling stage. When false,
|
||||
// initial request can be forwarded during network
|
||||
// communication.
|
||||
assembling bool
|
||||
}
|
||||
|
||||
type execOption func(*execCtx)
|
||||
|
|
|
@ -32,6 +32,8 @@ type RangeHashPrm struct {
|
|||
salt []byte
|
||||
}
|
||||
|
||||
type RequestForwarder func(client.Client) (*objectSDK.Object, error)
|
||||
|
||||
// HeadPrm groups parameters of Head service call.
|
||||
type HeadPrm struct {
|
||||
commonPrm
|
||||
|
@ -43,6 +45,8 @@ type commonPrm struct {
|
|||
common *util.CommonPrm
|
||||
|
||||
client.GetObjectParams
|
||||
|
||||
forwarder RequestForwarder
|
||||
}
|
||||
|
||||
// ChunkWriter is an interface of target component
|
||||
|
@ -100,6 +104,10 @@ func (p *commonPrm) SetCommonParameters(common *util.CommonPrm) {
|
|||
p.common = common
|
||||
}
|
||||
|
||||
func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
|
||||
p.forwarder = f
|
||||
}
|
||||
|
||||
// SetHeaderWriter sets target component to write the object header.
|
||||
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
|
||||
p.objWriter = &partWriter{
|
||||
|
|
|
@ -80,6 +80,10 @@ func (c *clientCacheWrapper) get(addr string) (getClient, error) {
|
|||
}
|
||||
|
||||
func (c *clientWrapper) getObject(exec *execCtx) (*objectSDK.Object, error) {
|
||||
if !exec.assembling {
|
||||
return exec.prm.forwarder(c.client)
|
||||
}
|
||||
|
||||
if exec.headOnly() {
|
||||
return c.client.GetObjectHeader(exec.context(),
|
||||
new(client.ObjectHeaderParams).
|
||||
|
|
|
@ -96,7 +96,7 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
|
|||
resp := new(objectV2.HeadResponse)
|
||||
resp.SetBody(new(objectV2.HeadResponseBody))
|
||||
|
||||
p, err := s.toHeadPrm(req, resp)
|
||||
p, err := s.toHeadPrm(ctx, req, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client"
|
||||
signature2 "github.com/nspcc-dev/neofs-api-go/util/signature"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
|
@ -16,6 +25,8 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
|
@ -39,6 +50,101 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
p.WithRawFlag(body.GetRaw())
|
||||
p.SetObjectWriter(&streamObjectWriter{stream})
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObject implementation,
|
||||
// perhaps it is worth highlighting the utility function in neofs-api-go
|
||||
|
||||
// open stream
|
||||
stream, err := rpc.GetObject(c.Raw(), req, rpcclient.WithContext(stream.Context()))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "stream opening failed")
|
||||
}
|
||||
|
||||
var (
|
||||
headWas bool
|
||||
payload []byte
|
||||
obj = new(objectV2.Object)
|
||||
resp = new(objectV2.GetResponse)
|
||||
)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
if !headWas {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return nil, errors.Errorf("unexpected object part %T", v)
|
||||
case *objectV2.GetObjectPartInit:
|
||||
if headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
headWas = true
|
||||
|
||||
obj.SetObjectID(v.GetObjectID())
|
||||
obj.SetSignature(v.GetSignature())
|
||||
|
||||
hdr := v.GetHeader()
|
||||
obj.SetHeader(hdr)
|
||||
|
||||
payload = make([]byte, 0, hdr.GetPayloadLength())
|
||||
case *objectV2.GetObjectPartChunk:
|
||||
if !headWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
payload = append(payload, v.GetChunk()...)
|
||||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
obj.SetPayload(payload)
|
||||
|
||||
// convert the object
|
||||
return objectSDK.NewFromV2(obj), nil
|
||||
})
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
@ -66,6 +172,77 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
p.SetChunkWriter(&streamObjectRangeWriter{stream})
|
||||
p.SetRange(objectSDK.NewRangeFromV2(body.GetRange()))
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.ObjectPayloadRangeData implementation,
|
||||
// perhaps it is worth highlighting the utility function in neofs-api-go
|
||||
|
||||
// open stream
|
||||
stream, err := rpc.GetObjectRange(c.Raw(), req, rpcclient.WithContext(stream.Context()))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create Get payload range stream")
|
||||
}
|
||||
|
||||
payload := make([]byte, body.GetRange().GetLength())
|
||||
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(errors.Cause(err), io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "reading the response failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return nil, errors.Errorf("unexpected range type %T", v)
|
||||
case *objectV2.GetRangePartChunk:
|
||||
payload = append(payload, v.GetChunk()...)
|
||||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
obj := objectSDK.NewRaw()
|
||||
obj.SetPayload(payload)
|
||||
|
||||
return obj.Object(), nil
|
||||
})
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
@ -132,7 +309,7 @@ func (w *headResponseWriter) WriteHeader(hdr *object.Object) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
||||
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
|
||||
|
@ -158,6 +335,111 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
|
|||
body: resp.GetBody(),
|
||||
})
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) {
|
||||
var err error
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObjectHeader implementation,
|
||||
// perhaps it is worth highlighting the utility function in neofs-api-go
|
||||
|
||||
// send Head request
|
||||
resp, err := rpc.HeadObject(c.Raw(), req, rpcclient.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "sending the request failed")
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return nil, errors.Wrap(err, "response verification failed")
|
||||
}
|
||||
|
||||
var (
|
||||
hdr *objectV2.Header
|
||||
idSig *refs.Signature
|
||||
)
|
||||
|
||||
switch v := resp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, errors.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if !body.GetMainOnly() {
|
||||
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
h := v
|
||||
|
||||
hdr = new(objectV2.Header)
|
||||
hdr.SetPayloadLength(h.GetPayloadLength())
|
||||
hdr.SetVersion(h.GetVersion())
|
||||
hdr.SetOwnerID(h.GetOwnerID())
|
||||
hdr.SetObjectType(h.GetObjectType())
|
||||
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
||||
hdr.SetPayloadHash(h.GetPayloadHash())
|
||||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if body.GetMainOnly() {
|
||||
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
hdrWithSig := v
|
||||
if hdrWithSig == nil {
|
||||
return nil, errors.New("nil object part")
|
||||
}
|
||||
|
||||
hdr = hdrWithSig.GetHeader()
|
||||
idSig = hdrWithSig.GetSignature()
|
||||
|
||||
if err := signature2.VerifyDataWithSource(
|
||||
signature.StableMarshalerWrapper{
|
||||
SM: p.Address().ObjectID().ToV2(),
|
||||
},
|
||||
func() (key, sig []byte) {
|
||||
return idSig.GetKey(), idSig.GetSign()
|
||||
},
|
||||
); err != nil {
|
||||
return nil, errors.Wrap(err, "incorrect object header signature")
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
}
|
||||
|
||||
obj := new(objectV2.Object)
|
||||
obj.SetHeader(hdr)
|
||||
obj.SetSignature(idSig)
|
||||
|
||||
raw := object.NewRawFromV2(obj)
|
||||
raw.SetID(p.Address().ObjectID())
|
||||
|
||||
// convert the object
|
||||
return raw.Object().SDK(), nil
|
||||
})
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue