diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 29921e8f3..bcbd581c5 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -12,6 +12,8 @@ func (exec *execCtx) assemble() { return } + exec.assembling = true + exec.log.Debug("trying to assemble the object...") splitInfo := exec.splitInfo() diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 882226855..19fa646ba 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -41,6 +41,12 @@ type execCtx struct { head bool curProcEpoch uint64 + + // true when the processing of the initial request + // is turned to assembling stage. When false, + // initial request can be forwarded during network + // communication. + assembling bool } type execOption func(*execCtx) diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 7ee493549..a10fb3003 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -32,6 +32,8 @@ type RangeHashPrm struct { salt []byte } +type RequestForwarder func(client.Client) (*objectSDK.Object, error) + // HeadPrm groups parameters of Head service call. type HeadPrm struct { commonPrm @@ -43,6 +45,8 @@ type commonPrm struct { common *util.CommonPrm client.GetObjectParams + + forwarder RequestForwarder } // ChunkWriter is an interface of target component @@ -100,6 +104,10 @@ func (p *commonPrm) SetCommonParameters(common *util.CommonPrm) { p.common = common } +func (p *commonPrm) SetRequestForwarder(f RequestForwarder) { + p.forwarder = f +} + // SetHeaderWriter sets target component to write the object header. func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) { p.objWriter = &partWriter{ diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 01a55edea..3c992bd93 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -80,6 +80,10 @@ func (c *clientCacheWrapper) get(addr string) (getClient, error) { } func (c *clientWrapper) getObject(exec *execCtx) (*objectSDK.Object, error) { + if !exec.assembling { + return exec.prm.forwarder(c.client) + } + if exec.headOnly() { return c.client.GetObjectHeader(exec.context(), new(client.ObjectHeaderParams). diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index f2e408534..21944428d 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -96,7 +96,7 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV resp := new(objectV2.HeadResponse) resp.SetBody(new(objectV2.HeadResponseBody)) - p, err := s.toHeadPrm(req, resp) + p, err := s.toHeadPrm(ctx, req, resp) if err != nil { return nil, err } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 92fbd2fcc..c3f13b0ee 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -1,13 +1,22 @@ package getsvc import ( + "context" "crypto/sha256" "hash" + "io" + "sync" + "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/token" + rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client" + signature2 "github.com/nspcc-dev/neofs-api-go/util/signature" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-node/pkg/core/object" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" @@ -16,6 +25,8 @@ import ( "github.com/pkg/errors" ) +var errWrongMessageSeq = errors.New("incorrect message sequence") + func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { meta := req.GetMetaHeader() @@ -39,6 +50,101 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre p.WithRawFlag(body.GetRaw()) p.SetObjectWriter(&streamObjectWriter{stream}) + if !commonPrm.LocalOnly() { + var onceResign sync.Once + + p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + var err error + + // once compose and resign forwarding request + onceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(meta.GetTTL() - 1) + // TODO: think how to set the other fields + metaHdr.SetOrigin(meta) + + req.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, req) + }) + + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.GetObject implementation, + // perhaps it is worth highlighting the utility function in neofs-api-go + + // open stream + stream, err := rpc.GetObject(c.Raw(), req, rpcclient.WithContext(stream.Context())) + if err != nil { + return nil, errors.Wrap(err, "stream opening failed") + } + + var ( + headWas bool + payload []byte + obj = new(objectV2.Object) + resp = new(objectV2.GetResponse) + ) + + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + if !headWas { + return nil, io.ErrUnexpectedEOF + } + + break + } + + return nil, errors.Wrap(err, "reading the response failed") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrap(err, "response verification failed") + } + + switch v := resp.GetBody().GetObjectPart().(type) { + default: + return nil, errors.Errorf("unexpected object part %T", v) + case *objectV2.GetObjectPartInit: + if headWas { + return nil, errWrongMessageSeq + } + + headWas = true + + obj.SetObjectID(v.GetObjectID()) + obj.SetSignature(v.GetSignature()) + + hdr := v.GetHeader() + obj.SetHeader(hdr) + + payload = make([]byte, 0, hdr.GetPayloadLength()) + case *objectV2.GetObjectPartChunk: + if !headWas { + return nil, errWrongMessageSeq + } + + payload = append(payload, v.GetChunk()...) + case *objectV2.SplitInfo: + si := objectSDK.NewSplitInfoFromV2(v) + return nil, objectSDK.NewSplitInfoError(si) + } + } + + obj.SetPayload(payload) + + // convert the object + return objectSDK.NewFromV2(obj), nil + }) + } + return p, nil } @@ -66,6 +172,77 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get p.SetChunkWriter(&streamObjectRangeWriter{stream}) p.SetRange(objectSDK.NewRangeFromV2(body.GetRange())) + if !commonPrm.LocalOnly() { + var onceResign sync.Once + + p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + var err error + + // once compose and resign forwarding request + onceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(meta.GetTTL() - 1) + // TODO: think how to set the other fields + metaHdr.SetOrigin(meta) + + req.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, req) + }) + + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.ObjectPayloadRangeData implementation, + // perhaps it is worth highlighting the utility function in neofs-api-go + + // open stream + stream, err := rpc.GetObjectRange(c.Raw(), req, rpcclient.WithContext(stream.Context())) + if err != nil { + return nil, errors.Wrap(err, "could not create Get payload range stream") + } + + payload := make([]byte, body.GetRange().GetLength()) + + resp := new(objectV2.GetRangeResponse) + + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + return nil, errors.Wrap(err, "reading the response failed") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrapf(err, "could not verify %T", resp) + } + + switch v := resp.GetBody().GetRangePart().(type) { + case nil: + return nil, errors.Errorf("unexpected range type %T", v) + case *objectV2.GetRangePartChunk: + payload = append(payload, v.GetChunk()...) + case *objectV2.SplitInfo: + si := objectSDK.NewSplitInfoFromV2(v) + + return nil, objectSDK.NewSplitInfoError(si) + } + } + + obj := objectSDK.NewRaw() + obj.SetPayload(payload) + + return obj.Object(), nil + }) + } + return p, nil } @@ -132,7 +309,7 @@ func (w *headResponseWriter) WriteHeader(hdr *object.Object) error { return nil } -func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { +func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { meta := req.GetMetaHeader() key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) @@ -158,6 +335,111 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon body: resp.GetBody(), }) + if !commonPrm.LocalOnly() { + var onceResign sync.Once + + p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + var err error + + // once compose and resign forwarding request + onceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(meta.GetTTL() - 1) + // TODO: think how to set the other fields + metaHdr.SetOrigin(meta) + + req.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, req) + }) + + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.GetObjectHeader implementation, + // perhaps it is worth highlighting the utility function in neofs-api-go + + // send Head request + resp, err := rpc.HeadObject(c.Raw(), req, rpcclient.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "sending the request failed") + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, errors.Wrap(err, "response verification failed") + } + + var ( + hdr *objectV2.Header + idSig *refs.Signature + ) + + switch v := resp.GetBody().GetHeaderPart().(type) { + case nil: + return nil, errors.Errorf("unexpected header type %T", v) + case *objectV2.ShortHeader: + if !body.GetMainOnly() { + return nil, errors.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), + ) + } + + h := v + + hdr = new(objectV2.Header) + hdr.SetPayloadLength(h.GetPayloadLength()) + hdr.SetVersion(h.GetVersion()) + hdr.SetOwnerID(h.GetOwnerID()) + hdr.SetObjectType(h.GetObjectType()) + hdr.SetCreationEpoch(h.GetCreationEpoch()) + hdr.SetPayloadHash(h.GetPayloadHash()) + hdr.SetHomomorphicHash(h.GetHomomorphicHash()) + case *objectV2.HeaderWithSignature: + if body.GetMainOnly() { + return nil, errors.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), + ) + } + + hdrWithSig := v + if hdrWithSig == nil { + return nil, errors.New("nil object part") + } + + hdr = hdrWithSig.GetHeader() + idSig = hdrWithSig.GetSignature() + + if err := signature2.VerifyDataWithSource( + signature.StableMarshalerWrapper{ + SM: p.Address().ObjectID().ToV2(), + }, + func() (key, sig []byte) { + return idSig.GetKey(), idSig.GetSign() + }, + ); err != nil { + return nil, errors.Wrap(err, "incorrect object header signature") + } + case *objectV2.SplitInfo: + si := objectSDK.NewSplitInfoFromV2(v) + + return nil, objectSDK.NewSplitInfoError(si) + } + + obj := new(objectV2.Object) + obj.SetHeader(hdr) + obj.SetSignature(idSig) + + raw := object.NewRawFromV2(obj) + raw.SetID(p.Address().ObjectID()) + + // convert the object + return raw.Object().SDK(), nil + }) + } + return p, nil }