forked from TrueCloudLab/frostfs-node
[#501] object/put: relay requests for signed objects
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
a422f42ca9
commit
3468491224
5 changed files with 99 additions and 5 deletions
|
@ -25,6 +25,8 @@ type distributedTarget struct {
|
||||||
|
|
||||||
nodeTargetInitializer func(*network.Address) transformer.ObjectTarget
|
nodeTargetInitializer func(*network.Address) transformer.ObjectTarget
|
||||||
|
|
||||||
|
relay func(*network.Address) error
|
||||||
|
|
||||||
fmt *object.FormatValidator
|
fmt *object.FormatValidator
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
@ -67,6 +69,13 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) sendObject(addr *network.Address) error {
|
func (t *distributedTarget) sendObject(addr *network.Address) error {
|
||||||
|
if t.relay != nil {
|
||||||
|
err := t.relay(addr)
|
||||||
|
if err == nil || !errors.Is(err, errLocalAddress) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
target := t.nodeTargetInitializer(addr)
|
target := t.nodeTargetInitializer(addr)
|
||||||
|
|
||||||
if err := target.WriteHeader(t.obj); err != nil {
|
if err := target.WriteHeader(t.obj); err != nil {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package putsvc
|
package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
@ -12,6 +13,8 @@ type PutInitPrm struct {
|
||||||
hdr *object.RawObject
|
hdr *object.RawObject
|
||||||
|
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
|
relay func(client.Client) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type PutChunkPrm struct {
|
type PutChunkPrm struct {
|
||||||
|
@ -42,6 +45,14 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutInitPrm) WithRelay(f func(client.Client) error) *PutInitPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.relay = f
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.chunk = v
|
p.chunk = v
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
@ -18,6 +19,8 @@ type Streamer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
target transformer.ObjectTarget
|
target transformer.ObjectTarget
|
||||||
|
|
||||||
|
relay func(client.Client) error
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotInit = errors.New("stream not initialized")
|
var errNotInit = errors.New("stream not initialized")
|
||||||
|
@ -48,6 +51,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.hdr.Signature() != nil {
|
if prm.hdr.Signature() != nil {
|
||||||
|
p.relay = prm.relay
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingTarget{
|
p.target = &validatingTarget{
|
||||||
nextTarget: p.newCommonTarget(prm),
|
nextTarget: p.newCommonTarget(prm),
|
||||||
|
@ -128,7 +133,25 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errLocalAddress = errors.New("can't relay to local address")
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
|
var relay func(*network.Address) error
|
||||||
|
if p.relay != nil {
|
||||||
|
relay = func(addr *network.Address) error {
|
||||||
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
||||||
|
return errLocalAddress
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := p.clientConstructor.Get(addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create SDK client %s: %w", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.relay(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
traverseOpts: prm.traverseOpts,
|
traverseOpts: prm.traverseOpts,
|
||||||
workerPool: p.workerPool,
|
workerPool: p.workerPool,
|
||||||
|
@ -147,6 +170,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
clientConstructor: p.clientConstructor,
|
clientConstructor: p.clientConstructor,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
relay: relay,
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
log: p.log,
|
log: p.log,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,18 @@ package putsvc
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||||
)
|
)
|
||||||
|
|
||||||
type streamer struct {
|
type streamer struct {
|
||||||
stream *putsvc.Streamer
|
stream *putsvc.Streamer
|
||||||
|
saveChunks bool
|
||||||
|
init *object.PutRequest
|
||||||
|
chunks []*object.PutRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamer) Send(req *object.PutRequest) (err error) {
|
func (s *streamer) Send(req *object.PutRequest) (err error) {
|
||||||
|
@ -16,7 +22,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
|
||||||
case *object.PutObjectPartInit:
|
case *object.PutObjectPartInit:
|
||||||
var initPrm *putsvc.PutInitPrm
|
var initPrm *putsvc.PutInitPrm
|
||||||
|
|
||||||
initPrm, err = toInitPrm(v, req)
|
initPrm, err = s.toInitPrm(v, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -24,10 +30,19 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
|
||||||
if err = s.stream.Init(initPrm); err != nil {
|
if err = s.stream.Init(initPrm); err != nil {
|
||||||
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
|
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.saveChunks = v.GetSignature() != nil
|
||||||
|
if s.saveChunks {
|
||||||
|
s.init = req
|
||||||
|
}
|
||||||
case *object.PutObjectPartChunk:
|
case *object.PutObjectPartChunk:
|
||||||
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
|
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
|
||||||
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
|
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.saveChunks {
|
||||||
|
s.chunks = append(s.chunks, req)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
||||||
}
|
}
|
||||||
|
@ -43,3 +58,37 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
|
||||||
|
|
||||||
return fromPutResponse(resp), nil
|
return fromPutResponse(resp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *streamer) relayRequest(c client.Client) error {
|
||||||
|
// open stream
|
||||||
|
resp := new(object.PutResponse)
|
||||||
|
|
||||||
|
stream, err := rpc.PutObject(c.Raw(), resp)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stream opening failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// send init part
|
||||||
|
err = stream.Write(s.init)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("sending the initial message to stream failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range s.chunks {
|
||||||
|
if err := stream.Write(s.chunks[i]); err != nil {
|
||||||
|
return fmt.Errorf("sending the chunk %d failed: %w", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// close object stream and receive response from remote node
|
||||||
|
err = stream.Close()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("closing the stream failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify response structure
|
||||||
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||||
|
return fmt.Errorf("response verification failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) {
|
func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) {
|
||||||
oV2 := new(objectV2.Object)
|
oV2 := new(objectV2.Object)
|
||||||
oV2.SetObjectID(part.GetObjectID())
|
oV2.SetObjectID(part.GetObjectID())
|
||||||
oV2.SetSignature(part.GetSignature())
|
oV2.SetSignature(part.GetSignature())
|
||||||
|
@ -22,6 +22,7 @@ func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*put
|
||||||
WithObject(
|
WithObject(
|
||||||
object.NewRawFromV2(oV2),
|
object.NewRawFromV2(oV2),
|
||||||
).
|
).
|
||||||
|
WithRelay(s.relayRequest).
|
||||||
WithCommonPrm(commonPrm), nil
|
WithCommonPrm(commonPrm), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue