From 34684912240aa2c422180d9404a0da930249f9a0 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 27 May 2021 17:25:29 +0300 Subject: [PATCH] [#501] object/put: relay requests for signed objects Signed-off-by: Evgenii Stratonikov --- pkg/services/object/put/distributed.go | 9 +++++ pkg/services/object/put/prm.go | 11 ++++++ pkg/services/object/put/streamer.go | 28 +++++++++++++- pkg/services/object/put/v2/streamer.go | 53 +++++++++++++++++++++++++- pkg/services/object/put/v2/util.go | 3 +- 5 files changed, 99 insertions(+), 5 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 0b35f284..e302cf1a 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -25,6 +25,8 @@ type distributedTarget struct { nodeTargetInitializer func(*network.Address) transformer.ObjectTarget + relay func(*network.Address) error + fmt *object.FormatValidator log *logger.Logger @@ -67,6 +69,13 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { } func (t *distributedTarget) sendObject(addr *network.Address) error { + if t.relay != nil { + err := t.relay(addr) + if err == nil || !errors.Is(err, errLocalAddress) { + return err + } + } + target := t.nodeTargetInitializer(addr) if err := target.WriteHeader(t.obj); err != nil { diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 55c52d12..f7a766c5 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,6 +1,7 @@ package putsvc import ( + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -12,6 +13,8 @@ type PutInitPrm struct { hdr *object.RawObject traverseOpts []placement.Option + + relay func(client.Client) error } type PutChunkPrm struct { @@ -42,6 +45,14 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm { return p } +func (p *PutInitPrm) WithRelay(f func(client.Client) error) *PutInitPrm { + if p != nil { + p.relay = f + } + + return p +} + func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { if p != nil { p.chunk = v diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 902999e0..3ccb8000 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -18,6 +19,8 @@ type Streamer struct { ctx context.Context target transformer.ObjectTarget + + relay func(client.Client) error } var errNotInit = errors.New("stream not initialized") @@ -48,6 +51,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { } if prm.hdr.Signature() != nil { + p.relay = prm.relay + // prepare untrusted-Put object target p.target = &validatingTarget{ nextTarget: p.newCommonTarget(prm), @@ -128,7 +133,25 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } +var errLocalAddress = errors.New("can't relay to local address") + func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { + var relay func(*network.Address) error + if p.relay != nil { + relay = func(addr *network.Address) error { + if network.IsLocalAddress(p.localAddrSrc, addr) { + return errLocalAddress + } + + c, err := p.clientConstructor.Get(addr) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", addr, err) + } + + return p.relay(c) + } + } + return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, @@ -147,8 +170,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { clientConstructor: p.clientConstructor, } }, - fmt: p.fmtValidator, - log: p.log, + relay: relay, + fmt: p.fmtValidator, + log: p.log, } } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index f4044e34..ef75ebc0 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -3,12 +3,18 @@ package putsvc import ( "fmt" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/signature" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" ) type streamer struct { - stream *putsvc.Streamer + stream *putsvc.Streamer + saveChunks bool + init *object.PutRequest + chunks []*object.PutRequest } func (s *streamer) Send(req *object.PutRequest) (err error) { @@ -16,7 +22,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { case *object.PutObjectPartInit: var initPrm *putsvc.PutInitPrm - initPrm, err = toInitPrm(v, req) + initPrm, err = s.toInitPrm(v, req) if err != nil { return err } @@ -24,10 +30,19 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { if err = s.stream.Init(initPrm); err != nil { err = fmt.Errorf("(%T) could not init object put stream: %w", s, err) } + + s.saveChunks = v.GetSignature() != nil + if s.saveChunks { + s.init = req + } case *object.PutObjectPartChunk: if err = s.stream.SendChunk(toChunkPrm(v)); err != nil { err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err) } + + if s.saveChunks { + s.chunks = append(s.chunks, req) + } default: err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v) } @@ -43,3 +58,37 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { return fromPutResponse(resp), nil } + +func (s *streamer) relayRequest(c client.Client) error { + // open stream + resp := new(object.PutResponse) + + stream, err := rpc.PutObject(c.Raw(), resp) + if err != nil { + return fmt.Errorf("stream opening failed: %w", err) + } + + // send init part + err = stream.Write(s.init) + if err != nil { + return fmt.Errorf("sending the initial message to stream failed: %w", err) + } + + for i := range s.chunks { + if err := stream.Write(s.chunks[i]); err != nil { + return fmt.Errorf("sending the chunk %d failed: %w", i, err) + } + } + + // close object stream and receive response from remote node + err = stream.Close() + if err != nil { + return fmt.Errorf("closing the stream failed: %w", err) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + return nil +} diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index b4235325..7caab6df 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -7,7 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { +func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { oV2 := new(objectV2.Object) oV2.SetObjectID(part.GetObjectID()) oV2.SetSignature(part.GetSignature()) @@ -22,6 +22,7 @@ func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*put WithObject( object.NewRawFromV2(oV2), ). + WithRelay(s.relayRequest). WithCommonPrm(commonPrm), nil }