diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 7279b869..93d70a5a 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -36,8 +36,14 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool case err == nil: exec.status = statusOK exec.err = nil - exec.collectedObject = obj - exec.writeCollectedObject() + + // both object and err are nil only if the original + // request was forwarded to another node and the object + // has already been streamed to the requesting party + if obj != nil { + exec.collectedObject = obj + exec.writeCollectedObject() + } case errors.As(err, &errRemoved): exec.status = statusINHUMED exec.err = errRemoved diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 7ab7fd03..1c736667 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -54,12 +54,14 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, err } + streamWrapper := &streamObjectWriter{stream} + p := new(getsvc.Prm) p.SetCommonParameters(commonPrm) p.WithAddress(addr) p.WithRawFlag(body.GetRaw()) - p.SetObjectWriter(&streamObjectWriter{stream}) + p.SetObjectWriter(streamWrapper) if !commonPrm.LocalOnly() { var onceResign sync.Once @@ -105,8 +107,6 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre var ( headWas bool - payload []byte - obj = new(objectV2.Object) resp = new(objectV2.GetResponse) ) @@ -149,29 +149,30 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre headWas = true + obj := new(objectV2.Object) + obj.SetObjectID(v.GetObjectID()) obj.SetSignature(v.GetSignature()) + obj.SetHeader(v.GetHeader()) - hdr := v.GetHeader() - obj.SetHeader(hdr) - - payload = make([]byte, 0, hdr.GetPayloadLength()) + if err = streamWrapper.WriteHeader(object.NewFromV2(obj)); err != nil { + return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) + } case *objectV2.GetObjectPartChunk: if !headWas { return nil, errWrongMessageSeq } - payload = append(payload, v.GetChunk()...) + if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil { + return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) + } case *objectV2.SplitInfo: si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) } } - obj.SetPayload(payload) - - // convert the object - return object.NewFromV2(obj), nil + return nil, nil })) } @@ -203,9 +204,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get p := new(getsvc.RangePrm) p.SetCommonParameters(commonPrm) + streamWrapper := &streamObjectRangeWriter{stream} + p.WithAddress(addr) p.WithRawFlag(body.GetRaw()) - p.SetChunkWriter(&streamObjectRangeWriter{stream}) + p.SetChunkWriter(streamWrapper) p.SetRange(object.NewRangeFromV2(body.GetRange())) if !commonPrm.LocalOnly() { @@ -250,9 +253,6 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } - // allocate memory only after receiving a successful response - var payload []byte - resp := new(objectV2.GetRangeResponse) for { @@ -284,11 +284,9 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get case nil: return nil, fmt.Errorf("unexpected range type %T", v) case *objectV2.GetRangePartChunk: - if payload == nil { - payload = make([]byte, 0, body.GetRange().GetLength()) + if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil { + return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) } - - payload = append(payload, v.GetChunk()...) case *objectV2.SplitInfo: si := object.NewSplitInfoFromV2(v) @@ -296,10 +294,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } } - obj := object.New() - obj.SetPayload(payload) - - return obj, nil + return nil, nil })) }