diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 0b35f284d..e302cf1a5 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -25,6 +25,8 @@ type distributedTarget struct { nodeTargetInitializer func(*network.Address) transformer.ObjectTarget + relay func(*network.Address) error + fmt *object.FormatValidator log *logger.Logger @@ -67,6 +69,13 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { } func (t *distributedTarget) sendObject(addr *network.Address) error { + if t.relay != nil { + err := t.relay(addr) + if err == nil || !errors.Is(err, errLocalAddress) { + return err + } + } + target := t.nodeTargetInitializer(addr) if err := target.WriteHeader(t.obj); err != nil { diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 55c52d12f..f7a766c51 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,6 +1,7 @@ package putsvc import ( + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -12,6 +13,8 @@ type PutInitPrm struct { hdr *object.RawObject traverseOpts []placement.Option + + relay func(client.Client) error } type PutChunkPrm struct { @@ -42,6 +45,14 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm { return p } +func (p *PutInitPrm) WithRelay(f func(client.Client) error) *PutInitPrm { + if p != nil { + p.relay = f + } + + return p +} + func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { if p != nil { p.chunk = v diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 902999e04..3ccb80006 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -18,6 +19,8 @@ type Streamer struct { ctx context.Context target transformer.ObjectTarget + + relay func(client.Client) error } var errNotInit = errors.New("stream not initialized") @@ -48,6 +51,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { } if prm.hdr.Signature() != nil { + p.relay = prm.relay + // prepare untrusted-Put object target p.target = &validatingTarget{ nextTarget: p.newCommonTarget(prm), @@ -128,7 +133,25 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } +var errLocalAddress = errors.New("can't relay to local address") + func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { + var relay func(*network.Address) error + if p.relay != nil { + relay = func(addr *network.Address) error { + if network.IsLocalAddress(p.localAddrSrc, addr) { + return errLocalAddress + } + + c, err := p.clientConstructor.Get(addr) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", addr, err) + } + + return p.relay(c) + } + } + return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, @@ -147,8 +170,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { clientConstructor: p.clientConstructor, } }, - fmt: p.fmtValidator, - log: p.log, + relay: relay, + fmt: p.fmtValidator, + log: p.log, } } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index f4044e34d..ef75ebc04 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -3,12 +3,18 @@ package putsvc import ( "fmt" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/signature" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" ) type streamer struct { - stream *putsvc.Streamer + stream *putsvc.Streamer + saveChunks bool + init *object.PutRequest + chunks []*object.PutRequest } func (s *streamer) Send(req *object.PutRequest) (err error) { @@ -16,7 +22,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { case *object.PutObjectPartInit: var initPrm *putsvc.PutInitPrm - initPrm, err = toInitPrm(v, req) + initPrm, err = s.toInitPrm(v, req) if err != nil { return err } @@ -24,10 +30,19 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { if err = s.stream.Init(initPrm); err != nil { err = fmt.Errorf("(%T) could not init object put stream: %w", s, err) } + + s.saveChunks = v.GetSignature() != nil + if s.saveChunks { + s.init = req + } case *object.PutObjectPartChunk: if err = s.stream.SendChunk(toChunkPrm(v)); err != nil { err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err) } + + if s.saveChunks { + s.chunks = append(s.chunks, req) + } default: err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v) } @@ -43,3 +58,37 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { return fromPutResponse(resp), nil } + +func (s *streamer) relayRequest(c client.Client) error { + // open stream + resp := new(object.PutResponse) + + stream, err := rpc.PutObject(c.Raw(), resp) + if err != nil { + return fmt.Errorf("stream opening failed: %w", err) + } + + // send init part + err = stream.Write(s.init) + if err != nil { + return fmt.Errorf("sending the initial message to stream failed: %w", err) + } + + for i := range s.chunks { + if err := stream.Write(s.chunks[i]); err != nil { + return fmt.Errorf("sending the chunk %d failed: %w", i, err) + } + } + + // close object stream and receive response from remote node + err = stream.Close() + if err != nil { + return fmt.Errorf("closing the stream failed: %w", err) + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + return nil +} diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index b42353258..7caab6df5 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -7,7 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { +func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { oV2 := new(objectV2.Object) oV2.SetObjectID(part.GetObjectID()) oV2.SetSignature(part.GetSignature()) @@ -22,6 +22,7 @@ func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*put WithObject( object.NewRawFromV2(oV2), ). + WithRelay(s.relayRequest). WithCommonPrm(commonPrm), nil }