From 2624347d9b8031f1fde6b40991dc94d551be64ff Mon Sep 17 00:00:00 2001
From: Leonard Lyubich <leonard@nspcc.ru>
Date: Mon, 7 Feb 2022 23:27:56 +0300
Subject: [PATCH] [#131] client: Change interface of object PUT and GET ops

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
---
 client/common.go     | 104 +++++++--
 client/object.go     | 504 -------------------------------------------
 client/object_get.go | 313 +++++++++++++++++++++++++++
 client/object_put.go | 199 +++++++++++++++++
 go.mod               |   1 -
 pool/mock_test.go    |  32 ++-
 pool/pool.go         | 237 +++++++++++++++++---
 pool/pool_test.go    |  10 +-
 8 files changed, 822 insertions(+), 578 deletions(-)
 create mode 100644 client/object_get.go
 create mode 100644 client/object_put.go

diff --git a/client/common.go b/client/common.go
index 7afab982..a113e4a8 100644
--- a/client/common.go
+++ b/client/common.go
@@ -151,15 +151,26 @@ type contextCall struct {
 	req interface {
 		GetMetaHeader() *v2session.RequestMetaHeader
 		SetMetaHeader(*v2session.RequestMetaHeader)
+		SetVerificationHeader(*v2session.RequestVerificationHeader)
 	}
 
 	// function to send a request (unary) and receive a response
 	call func() (responseV2, error)
 
+	// function to send the request (req field)
+	wReq func() error
+
+	// function to recv the response (resp field)
+	rResp func() error
+
+	// function to close the message stream
+	closer func() error
+
 	// function of writing response fields to the resulting structure (optional)
 	result func(v2 responseV2)
 }
 
+// sets needed fields of the request meta header.
 func (x contextCall) prepareRequest() {
 	meta := x.req.GetMetaHeader()
 	if meta == nil {
@@ -178,6 +189,29 @@ func (x contextCall) prepareRequest() {
 	meta.SetNetworkMagic(x.netMagic)
 }
 
+// prepares, signs and writes the request. Result means success.
+// If failed, contextCall.err contains the reason.
+func (x *contextCall) writeRequest() bool {
+	x.prepareRequest()
+
+	x.req.SetVerificationHeader(nil)
+
+	// sign the request
+	x.err = signature.SignServiceMessage(&x.key, x.req)
+	if x.err != nil {
+		x.err = fmt.Errorf("sign request: %w", x.err)
+		return false
+	}
+
+	x.err = x.wReq()
+	if x.err != nil {
+		x.err = fmt.Errorf("write request: %w", x.err)
+		return false
+	}
+
+	return true
+}
+
 // performs common actions of response processing and writes any problem as a result status or client error
 // (in both cases returns false).
 //
@@ -220,32 +254,32 @@ func (x *contextCall) processResponse() bool {
 
 	x.statusRes.setStatus(st)
 
-	return successfulStatus
+	return successfulStatus || !x.resolveAPIFailures
 }
 
-// goes through all stages of sending a request and processing a response. Returns true if successful.
-func (x *contextCall) processCall() bool {
-	// prepare the request
-	x.prepareRequest()
-
-	// sign the request
-	x.err = signature.SignServiceMessage(&x.key, x.req)
-	if x.err != nil {
-		x.err = fmt.Errorf("sign request: %w", x.err)
-		return false
+// reads response (if rResp is set) and processes it. Result means success.
+// If failed, contextCall.err contains the reason.
+func (x *contextCall) readResponse() bool {
+	if x.rResp != nil {
+		x.err = x.rResp()
+		if x.err != nil {
+			x.err = fmt.Errorf("read response: %w", x.err)
+			return false
+		}
 	}
 
-	// perform RPC
-	x.resp, x.err = x.call()
-	if x.err != nil {
-		x.err = fmt.Errorf("transport error: %w", x.err)
-		return false
-	}
+	return x.processResponse()
+}
 
-	// process the response
-	ok := x.processResponse()
-	if !ok {
-		return false
+// closes the message stream (if closer is set) and writes the results (if resuls is set).
+// Return means success. If failed, contextCall.err contains the reason.
+func (x *contextCall) close() bool {
+	if x.closer != nil {
+		x.err = x.closer()
+		if x.err != nil {
+			x.err = fmt.Errorf("close RPC: %w", x.err)
+			return false
+		}
 	}
 
 	// write response to resulting structure
@@ -256,6 +290,34 @@ func (x *contextCall) processCall() bool {
 	return true
 }
 
+// goes through all stages of sending a request and processing a response. Returns true if successful.
+// If failed, contextCall.err contains the reason.
+func (x *contextCall) processCall() bool {
+	// set request writer
+	x.wReq = func() error {
+		var err error
+		x.resp, err = x.call()
+		return err
+	}
+
+	// write request
+	ok := x.writeRequest()
+	if !ok {
+		return false
+	}
+
+	// read response
+	ok = x.readResponse()
+	if !ok {
+		return false
+	}
+
+	// close and write response to resulting structure
+	ok = x.close()
+
+	return ok
+}
+
 // initializes static cross-call parameters inherited from client.
 func (c *Client) initCallContext(ctx *contextCall) {
 	ctx.key = *c.opts.key
diff --git a/client/object.go b/client/object.go
index ac500962..70062d69 100644
--- a/client/object.go
+++ b/client/object.go
@@ -1,9 +1,7 @@
 package client
 
 import (
-	"bytes"
 	"context"
-	"crypto/ecdsa"
 	"errors"
 	"fmt"
 	"io"
@@ -22,12 +20,6 @@ import (
 	signer "github.com/nspcc-dev/neofs-sdk-go/util/signature"
 )
 
-type PutObjectParams struct {
-	obj *object.Object
-
-	r io.Reader
-}
-
 // ObjectAddressWriter is an interface of the
 // component that writes the object address.
 type ObjectAddressWriter interface {
@@ -40,16 +32,6 @@ type DeleteObjectParams struct {
 	tombTgt ObjectAddressWriter
 }
 
-type GetObjectParams struct {
-	addr *address.Address
-
-	raw bool
-
-	w io.Writer
-
-	readerHandler ReaderHandler
-}
-
 type ObjectHeaderParams struct {
 	addr *address.Address
 
@@ -84,20 +66,6 @@ type SearchObjectParams struct {
 	filters object.SearchFilters
 }
 
-type putObjectV2Reader struct {
-	r io.Reader
-}
-
-type putObjectV2Writer struct {
-	key *ecdsa.PrivateKey
-
-	chunkPart *v2object.PutObjectPartChunk
-
-	req *v2object.PutRequest
-
-	stream *rpcapi.PutRequestWriter
-}
-
 type checksumType int
 
 const (
@@ -106,8 +74,6 @@ const (
 	checksumTZ
 )
 
-const chunkSize = 3 * (1 << 20)
-
 const TZSize = 64
 
 const searchQueryVersion uint32 = 1
@@ -133,199 +99,6 @@ func (t checksumType) toV2() v2refs.ChecksumType {
 	}
 }
 
-func (w *putObjectV2Reader) Read(p []byte) (int, error) {
-	return w.r.Read(p)
-}
-
-func (w *putObjectV2Writer) Write(p []byte) (int, error) {
-	w.chunkPart.SetChunk(p)
-
-	w.req.SetVerificationHeader(nil)
-
-	if err := signature.SignServiceMessage(w.key, w.req); err != nil {
-		return 0, fmt.Errorf("could not sign chunk request message: %w", err)
-	}
-
-	if err := w.stream.Write(w.req); err != nil {
-		return 0, fmt.Errorf("could not send chunk request message: %w", err)
-	}
-
-	return len(p), nil
-}
-
-func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams {
-	if p != nil {
-		p.obj = v
-	}
-
-	return p
-}
-
-func (p *PutObjectParams) Object() *object.Object {
-	if p != nil {
-		return p.obj
-	}
-
-	return nil
-}
-
-func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams {
-	if p != nil {
-		p.r = v
-	}
-
-	return p
-}
-
-func (p *PutObjectParams) PayloadReader() io.Reader {
-	if p != nil {
-		return p.r
-	}
-
-	return nil
-}
-
-type ObjectPutRes struct {
-	statusRes
-
-	id *oid.ID
-}
-
-func (x *ObjectPutRes) setID(id *oid.ID) {
-	x.id = id
-}
-
-func (x ObjectPutRes) ID() *oid.ID {
-	return x.id
-}
-
-// PutObject puts object through NeoFS API call.
-//
-// Any client's internal or transport errors are returned as `error`.
-// If WithNeoFSErrorParsing option has been provided, unsuccessful
-// NeoFS status codes are returned as `error`, otherwise, are included
-// in the returned result structure.
-func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*ObjectPutRes, error) {
-	callOpts := c.defaultCallOptions()
-
-	for i := range opts {
-		if opts[i] != nil {
-			opts[i](callOpts)
-		}
-	}
-
-	// create request
-	req := new(v2object.PutRequest)
-
-	// initialize request body
-	body := new(v2object.PutRequestBody)
-	req.SetBody(body)
-
-	v2Addr := new(v2refs.Address)
-	v2Addr.SetObjectID(p.obj.ID().ToV2())
-	v2Addr.SetContainerID(p.obj.ContainerID().ToV2())
-
-	// set meta header
-	meta := v2MetaHeaderFromOpts(callOpts)
-
-	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
-		addr: v2Addr,
-		verb: v2session.ObjectVerbPut,
-	}); err != nil {
-		return nil, fmt.Errorf("could not attach session token: %w", err)
-	}
-
-	req.SetMetaHeader(meta)
-
-	// initialize init part
-	initPart := new(v2object.PutObjectPartInit)
-	body.SetObjectPart(initPart)
-
-	obj := p.obj.ToV2()
-
-	// set init part fields
-	initPart.SetObjectID(obj.GetObjectID())
-	initPart.SetSignature(obj.GetSignature())
-	initPart.SetHeader(obj.GetHeader())
-
-	// sign the request
-	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
-		return nil, fmt.Errorf("signing the request failed: %w", err)
-	}
-
-	// open stream
-	resp := new(v2object.PutResponse)
-
-	stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx))
-	if err != nil {
-		return nil, fmt.Errorf("stream opening failed: %w", err)
-	}
-
-	// send init part
-	err = stream.Write(req)
-	if err != nil {
-		return nil, fmt.Errorf("sending the initial message to stream failed: %w", err)
-	}
-
-	// create payload bytes reader
-	var rPayload io.Reader = bytes.NewReader(obj.GetPayload())
-	if p.r != nil {
-		rPayload = io.MultiReader(rPayload, p.r)
-	}
-
-	// create v2 payload stream writer
-	chunkPart := new(v2object.PutObjectPartChunk)
-	body.SetObjectPart(chunkPart)
-
-	w := &putObjectV2Writer{
-		key:       callOpts.key,
-		chunkPart: chunkPart,
-		req:       req,
-		stream:    stream,
-	}
-
-	r := &putObjectV2Reader{r: rPayload}
-
-	// copy payload from reader to stream writer
-	_, err = io.CopyBuffer(w, r, make([]byte, chunkSize))
-	if err != nil && !errors.Is(err, io.EOF) {
-		return nil, fmt.Errorf("payload streaming failed: %w", err)
-	}
-
-	// close object stream and receive response from remote node
-	err = stream.Close()
-	if err != nil {
-		return nil, fmt.Errorf("closing the stream failed: %w", err)
-	}
-
-	var (
-		res     = new(ObjectPutRes)
-		procPrm processResponseV2Prm
-		procRes processResponseV2Res
-	)
-
-	procPrm.callOpts = callOpts
-	procPrm.resp = resp
-
-	procRes.statusRes = res
-
-	// process response in general
-	if c.processResponseV2(&procRes, procPrm) {
-		if procRes.cliErr != nil {
-			return nil, procRes.cliErr
-		}
-
-		return res, nil
-	}
-
-	// convert object identifier
-	id := oid.NewIDFromV2(resp.GetBody().GetObjectID())
-
-	res.setID(id)
-
-	return res, nil
-}
-
 func (p *DeleteObjectParams) WithAddress(v *address.Address) *DeleteObjectParams {
 	if p != nil {
 		p.addr = v
@@ -451,132 +224,6 @@ func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts .
 	return res, nil
 }
 
-func (p *GetObjectParams) WithAddress(v *address.Address) *GetObjectParams {
-	if p != nil {
-		p.addr = v
-	}
-
-	return p
-}
-
-func (p *GetObjectParams) Address() *address.Address {
-	if p != nil {
-		return p.addr
-	}
-
-	return nil
-}
-
-func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams {
-	if p != nil {
-		p.w = w
-	}
-
-	return p
-}
-
-func (p *GetObjectParams) PayloadWriter() io.Writer {
-	if p != nil {
-		return p.w
-	}
-
-	return nil
-}
-
-func (p *GetObjectParams) WithRawFlag(v bool) *GetObjectParams {
-	if p != nil {
-		p.raw = v
-	}
-
-	return p
-}
-
-func (p *GetObjectParams) RawFlag() bool {
-	if p != nil {
-		return p.raw
-	}
-
-	return false
-}
-
-// ReaderHandler is a function over io.Reader.
-type ReaderHandler func(io.Reader)
-
-// WithPayloadReaderHandler sets handler of the payload reader.
-//
-// If provided, payload reader is composed after receiving the header.
-// In this case payload writer set via WithPayloadWriter is ignored.
-//
-// Handler should not be nil.
-func (p *GetObjectParams) WithPayloadReaderHandler(f ReaderHandler) *GetObjectParams {
-	if p != nil {
-		p.readerHandler = f
-	}
-
-	return p
-}
-
-// wrapper over the Object Get stream that provides io.Reader.
-type objectPayloadReader struct {
-	stream interface {
-		Read(*v2object.GetResponse) error
-	}
-
-	resp v2object.GetResponse
-
-	tail []byte
-}
-
-func (x *objectPayloadReader) Read(p []byte) (read int, err error) {
-	// read remaining tail
-	read = copy(p, x.tail)
-
-	x.tail = x.tail[read:]
-
-	if len(p)-read == 0 {
-		return
-	}
-
-	// receive message from server stream
-	err = x.stream.Read(&x.resp)
-	if err != nil {
-		if errors.Is(err, io.EOF) {
-			err = io.EOF
-			return
-		}
-
-		err = fmt.Errorf("reading the response failed: %w", err)
-		return
-	}
-
-	// get chunk part message
-	part := x.resp.GetBody().GetObjectPart()
-
-	chunkPart, ok := part.(*v2object.GetObjectPartChunk)
-	if !ok {
-		err = errWrongMessageSeq
-		return
-	}
-
-	// verify response structure
-	if err = signature.VerifyServiceMessage(&x.resp); err != nil {
-		err = fmt.Errorf("response verification failed: %w", err)
-		return
-	}
-
-	// read new chunk
-	chunk := chunkPart.GetChunk()
-
-	tailOffset := copy(p[read:], chunk)
-
-	read += tailOffset
-
-	// save the tail
-	x.tail = append(x.tail, chunk[tailOffset:]...)
-
-	return
-}
-
 var errWrongMessageSeq = errors.New("incorrect message sequence")
 
 type ObjectGetRes struct {
@@ -604,157 +251,6 @@ func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) {
 	res.setStatus(st)
 }
 
-// GetObject receives object through NeoFS API call.
-//
-// Any client's internal or transport errors are returned as `error`.
-// If WithNeoFSErrorParsing option has been provided, unsuccessful
-// NeoFS status codes are returned as `error`, otherwise, are included
-// in the returned result structure.
-func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*ObjectGetRes, error) {
-	callOpts := c.defaultCallOptions()
-
-	for i := range opts {
-		if opts[i] != nil {
-			opts[i](callOpts)
-		}
-	}
-
-	// create request
-	req := new(v2object.GetRequest)
-
-	// initialize request body
-	body := new(v2object.GetRequestBody)
-	req.SetBody(body)
-
-	// set meta header
-	meta := v2MetaHeaderFromOpts(callOpts)
-
-	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
-		addr: p.addr.ToV2(),
-		verb: v2session.ObjectVerbGet,
-	}); err != nil {
-		return nil, fmt.Errorf("could not attach session token: %w", err)
-	}
-
-	req.SetMetaHeader(meta)
-
-	// fill body fields
-	body.SetAddress(p.addr.ToV2())
-	body.SetRaw(p.raw)
-
-	// sign the request
-	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
-		return nil, fmt.Errorf("signing the request failed: %w", err)
-	}
-
-	// open stream
-	stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx))
-	if err != nil {
-		return nil, fmt.Errorf("stream opening failed: %w", err)
-	}
-
-	var (
-		headWas bool
-		payload []byte
-		obj     = new(v2object.Object)
-		resp    = new(v2object.GetResponse)
-
-		messageWas bool
-
-		res     = new(ObjectGetRes)
-		procPrm processResponseV2Prm
-		procRes processResponseV2Res
-	)
-
-	procPrm.callOpts = callOpts
-	procPrm.resp = resp
-
-	procRes.statusRes = res
-
-loop:
-	for {
-		// receive message from server stream
-		err := stream.Read(resp)
-		if err != nil {
-			if errors.Is(err, io.EOF) {
-				if !messageWas {
-					return nil, errWrongMessageSeq
-				}
-
-				break
-			}
-
-			return nil, fmt.Errorf("reading the response failed: %w", err)
-		}
-
-		messageWas = true
-
-		// process response in general
-		if c.processResponseV2(&procRes, procPrm) {
-			if procRes.cliErr != nil {
-				return nil, procRes.cliErr
-			}
-
-			return res, nil
-		}
-
-		switch v := resp.GetBody().GetObjectPart().(type) {
-		default:
-			return nil, errWrongMessageSeq
-		case *v2object.GetObjectPartInit:
-			if headWas {
-				return nil, errWrongMessageSeq
-			}
-
-			headWas = true
-
-			obj.SetObjectID(v.GetObjectID())
-			obj.SetSignature(v.GetSignature())
-
-			hdr := v.GetHeader()
-			obj.SetHeader(hdr)
-
-			if p.readerHandler != nil {
-				p.readerHandler(&objectPayloadReader{
-					stream: stream,
-				})
-
-				break loop
-			}
-
-			if p.w == nil {
-				payload = make([]byte, 0, hdr.GetPayloadLength())
-			}
-		case *v2object.GetObjectPartChunk:
-			if !headWas {
-				return nil, errWrongMessageSeq
-			}
-
-			if p.w != nil {
-				if _, err := p.w.Write(v.GetChunk()); err != nil {
-					return nil, fmt.Errorf("could not write payload chunk: %w", err)
-				}
-			} else {
-				payload = append(payload, v.GetChunk()...)
-			}
-		case *v2object.SplitInfo:
-			if headWas {
-				return nil, errWrongMessageSeq
-			}
-
-			si := object.NewSplitInfoFromV2(v)
-			return nil, object.NewSplitInfoError(si)
-		}
-	}
-
-	obj.SetPayload(payload)
-
-	// convert the object
-	res.setObject(object.NewFromV2(obj))
-
-	return res, nil
-}
-
 func (p *ObjectHeaderParams) WithAddress(v *address.Address) *ObjectHeaderParams {
 	if p != nil {
 		p.addr = v
diff --git a/client/object_get.go b/client/object_get.go
new file mode 100644
index 00000000..504596b5
--- /dev/null
+++ b/client/object_get.go
@@ -0,0 +1,313 @@
+package client
+
+import (
+	"context"
+	"crypto/ecdsa"
+	"errors"
+	"fmt"
+	"io"
+
+	v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
+	v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
+	rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
+	"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
+	v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
+	apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
+	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
+	"github.com/nspcc-dev/neofs-sdk-go/object"
+	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
+	"github.com/nspcc-dev/neofs-sdk-go/session"
+	"github.com/nspcc-dev/neofs-sdk-go/token"
+)
+
+// PrmObjectGet groups parameters of ObjectGetInit operation.
+type PrmObjectGet struct {
+	raw bool
+
+	local bool
+
+	sessionSet bool
+	session    session.Token
+
+	bearerSet bool
+	bearer    token.BearerToken
+
+	cnrSet bool
+	cnr    cid.ID
+
+	objSet bool
+	obj    oid.ID
+}
+
+// MarkRaw marks an intent to read physically stored object.
+func (x *PrmObjectGet) MarkRaw() {
+	x.raw = true
+}
+
+// MarkLocal tells the server to execute the operation locally.
+func (x *PrmObjectGet) MarkLocal() {
+	x.local = true
+}
+
+// WithinSession specifies session within which object should be read.
+func (x *PrmObjectGet) WithinSession(t session.Token) {
+	x.session = t
+	x.sessionSet = true
+}
+
+// WithBearerToken attaches bearer token to be used for the operation.
+func (x *PrmObjectGet) WithBearerToken(t token.BearerToken) {
+	x.bearer = t
+	x.bearerSet = true
+}
+
+// FromContainer specifies NeoFS container of the object.
+// Required parameter.
+func (x *PrmObjectGet) FromContainer(id cid.ID) {
+	x.cnr = id
+	x.cnrSet = true
+}
+
+// ByID specifies identifier of the requested object.
+// Required parameter.
+func (x *PrmObjectGet) ByID(id oid.ID) {
+	x.obj = id
+	x.objSet = true
+}
+
+// ResObjectGet groups the final result values of ObjectGetInit operation.
+type ResObjectGet struct {
+	statusRes
+}
+
+// ObjectReader is designed to read one object from NeoFS system.
+//
+// Must be initialized using Client.ObjectGetInit, any other
+// usage is unsafe.
+type ObjectReader struct {
+	cancelCtxStream context.CancelFunc
+
+	ctxCall contextCall
+
+	// initially bound to contextCall
+	bodyResp v2object.GetResponseBody
+
+	tailPayload []byte
+}
+
+// UseKey specifies private key to sign the requests.
+// If key is not provided, then Client default key is used.
+func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) {
+	x.ctxCall.key = key
+}
+
+// ReadHeader reads header of the object. Result means success.
+// Failure reason can be received via Close.
+func (x *ObjectReader) ReadHeader(dst *object.Object) bool {
+	if !x.ctxCall.writeRequest() {
+		return false
+	} else if !x.ctxCall.readResponse() {
+		return false
+	}
+
+	var partInit *v2object.GetObjectPartInit
+
+	switch v := x.bodyResp.GetObjectPart().(type) {
+	default:
+		x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v)
+		return false
+	case *v2object.SplitInfo:
+		x.ctxCall.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v))
+		return false
+	case *v2object.GetObjectPartInit:
+		partInit = v
+	}
+
+	var objv2 v2object.Object
+
+	objv2.SetObjectID(partInit.GetObjectID())
+	objv2.SetHeader(partInit.GetHeader())
+	objv2.SetSignature(partInit.GetSignature())
+
+	*dst = *object.NewFromV2(&objv2) // need smth better
+
+	return true
+}
+
+func (x *ObjectReader) readChunk(buf []byte) (int, bool) {
+	var read int
+
+	// read remaining tail
+	read = copy(buf, x.tailPayload)
+
+	x.tailPayload = x.tailPayload[read:]
+
+	if len(buf) == read {
+		return read, true
+	}
+
+	// receive next message
+	ok := x.ctxCall.readResponse()
+	if !ok {
+		return read, false
+	}
+
+	// get chunk part message
+	part := x.bodyResp.GetObjectPart()
+
+	var partChunk *v2object.GetObjectPartChunk
+
+	partChunk, ok = part.(*v2object.GetObjectPartChunk)
+	if !ok {
+		x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part)
+		return read, false
+	}
+
+	// read new chunk
+	chunk := partChunk.GetChunk()
+
+	tailOffset := copy(buf[read:], chunk)
+
+	read += tailOffset
+
+	// save the tail
+	x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...)
+
+	return read, true
+}
+
+// ReadChunk reads another chunk of the object payload. Works similar to
+// io.Reader.Read but returns success flag instead of error.
+//
+// Failure reason can be received via Close.
+func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) {
+	return x.readChunk(buf)
+}
+
+func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) {
+	defer x.cancelCtxStream()
+
+	if x.ctxCall.err != nil {
+		if !errors.Is(x.ctxCall.err, io.EOF) {
+			return nil, x.ctxCall.err
+		} else if !ignoreEOF {
+			return nil, io.EOF
+		}
+	}
+
+	return x.ctxCall.statusRes.(*ResObjectGet), nil
+}
+
+// Close ends reading the object and returns the result of the operation
+// along with the final results. Must be called after using the ObjectReader.
+//
+// Exactly one return value is non-nil. By default, server status is returned in res structure.
+// Any client's internal or transport errors are returned as Go built-in error.
+// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures
+// codes are returned as error.
+//
+// Return errors:
+//   *object.SplitInfoError (returned on virtual objects with PrmObjectGet.MakeRaw).
+//
+// Return statuses:
+//   global (see Client docs).
+func (x *ObjectReader) Close() (*ResObjectGet, error) {
+	return x.close(true)
+}
+
+func (x *ObjectReader) Read(p []byte) (int, error) {
+	n, ok := x.readChunk(p)
+	if !ok {
+		res, err := x.close(false)
+		if err != nil {
+			return n, err
+		} else if !x.ctxCall.resolveAPIFailures {
+			return n, apistatus.ErrFromStatus(res.Status())
+		}
+	}
+
+	return n, nil
+}
+
+// ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol.
+//
+// The call only opens the transmission channel, explicit fetching is done using the ObjectWriter.
+// Exactly one return value is non-nil. Resulting reader must be finally closed.
+//
+// Immediately panics if parameters are set incorrectly (see PrmObjectGet docs).
+// Context is required and must not be nil. It is used for network communication.
+func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectReader, error) {
+	// check parameters
+	switch {
+	case ctx == nil:
+		panic(panicMsgMissingContext)
+	case !prm.cnrSet:
+		panic(panicMsgMissingContainer)
+	case !prm.objSet:
+		panic("missing object")
+	}
+
+	var addr v2refs.Address
+
+	addr.SetContainerID(prm.cnr.ToV2())
+	addr.SetObjectID(prm.obj.ToV2())
+
+	// form request body
+	var body v2object.GetRequestBody
+
+	body.SetRaw(prm.raw)
+	body.SetAddress(&addr)
+
+	// form meta header
+	var meta v2session.RequestMetaHeader
+
+	if prm.local {
+		meta.SetTTL(1)
+	}
+
+	if prm.bearerSet {
+		meta.SetBearerToken(prm.bearer.ToV2())
+	}
+
+	if prm.sessionSet {
+		meta.SetSessionToken(prm.session.ToV2())
+	}
+
+	// form request
+	var req v2object.GetRequest
+
+	req.SetBody(&body)
+	req.SetMetaHeader(&meta)
+
+	// init reader
+	var (
+		r      ObjectReader
+		resp   v2object.GetResponse
+		stream *rpcapi.GetResponseReader
+	)
+
+	ctx, r.cancelCtxStream = context.WithCancel(ctx)
+
+	resp.SetBody(&r.bodyResp)
+
+	// init call context
+	c.initCallContext(&r.ctxCall)
+	r.ctxCall.req = &req
+	r.ctxCall.statusRes = new(ResObjectGet)
+	r.ctxCall.resp = &resp
+	r.ctxCall.wReq = func() error {
+		var err error
+
+		stream, err = rpcapi.GetObject(c.Raw(), &req, client.WithContext(ctx))
+		if err != nil {
+			return fmt.Errorf("open stream: %w", err)
+		}
+
+		return nil
+	}
+	r.ctxCall.rResp = func() error {
+		return stream.Read(&resp)
+	}
+
+	return &r, nil
+}
diff --git a/client/object_put.go b/client/object_put.go
new file mode 100644
index 00000000..68a67d33
--- /dev/null
+++ b/client/object_put.go
@@ -0,0 +1,199 @@
+package client
+
+import (
+	"context"
+	"crypto/ecdsa"
+	"fmt"
+
+	v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
+	rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
+	"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
+	v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
+	"github.com/nspcc-dev/neofs-sdk-go/object"
+	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
+	"github.com/nspcc-dev/neofs-sdk-go/session"
+	"github.com/nspcc-dev/neofs-sdk-go/token"
+)
+
+// PrmObjectPutInit groups parameters of ObjectPutInit operation.
+//
+// At the moment the operation is not parameterized, however,
+// the structure is still declared for backward compatibility.
+type PrmObjectPutInit struct{}
+
+// ResObjectPut groups the final result values of ObjectPutInit operation.
+type ResObjectPut struct {
+	statusRes
+
+	resp v2object.PutResponse
+}
+
+// ReadStoredObject reads identifier of the saved object.
+// Returns false if ID is missing (not read).
+func (x *ResObjectPut) ReadStoredObject(id *oid.ID) bool {
+	idv2 := x.resp.GetBody().GetObjectID()
+	if idv2 == nil {
+		return false
+	}
+
+	*id = *oid.NewIDFromV2(idv2) // need smth better
+
+	return true
+}
+
+// ObjectWriter is designed to write one object to NeoFS system.
+//
+// Must be initialized using Client.ObjectPutInit, any other
+// usage is unsafe.
+type ObjectWriter struct {
+	cancelCtxStream context.CancelFunc
+
+	ctxCall contextCall
+
+	// initially bound tp contextCall
+	metaHdr v2session.RequestMetaHeader
+
+	// initially bound to contextCall
+	partInit v2object.PutObjectPartInit
+
+	chunkCalled bool
+
+	partChunk v2object.PutObjectPartChunk
+}
+
+// UseKey specifies private key to sign the requests.
+// If key is not provided, then Client default key is used.
+func (x *ObjectWriter) UseKey(key ecdsa.PrivateKey) {
+	x.ctxCall.key = key
+}
+
+// WithBearerToken attaches bearer token to be used for the operation.
+// Should be called once before any writing steps.
+func (x *ObjectWriter) WithBearerToken(t token.BearerToken) {
+	x.metaHdr.SetBearerToken(t.ToV2())
+}
+
+// WithinSession specifies session within which object should be stored.
+// Should be called once before any writing steps.
+func (x *ObjectWriter) WithinSession(t session.Token) {
+	x.metaHdr.SetSessionToken(t.ToV2())
+}
+
+// MarkLocal tells the server to execute the operation locally.
+func (x *ObjectWriter) MarkLocal() {
+	x.metaHdr.SetTTL(1)
+}
+
+// WriteHeader writes header of the object. Result means success.
+// Failure reason can be received via Close.
+func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
+	v2Hdr := hdr.ToV2()
+
+	x.partInit.SetObjectID(v2Hdr.GetObjectID())
+	x.partInit.SetHeader(v2Hdr.GetHeader())
+	x.partInit.SetSignature(v2Hdr.GetSignature())
+
+	return x.ctxCall.writeRequest()
+}
+
+// WritePayloadChunk writes chunk of the object payload. Result means success.
+// Failure reason can be received via Close.
+func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
+	if !x.chunkCalled {
+		x.chunkCalled = true
+		x.ctxCall.req.(*v2object.PutRequest).GetBody().SetObjectPart(&x.partChunk)
+	}
+
+	for ln := len(chunk); ln > 0; ln = len(chunk) {
+		if ln > 512 {
+			ln = 512
+		}
+
+		x.partChunk.SetChunk(chunk[:ln])
+
+		if !x.ctxCall.writeRequest() {
+			return false
+		}
+
+		chunk = chunk[ln:]
+	}
+
+	return true
+}
+
+// Close ends writing the object and returns the result of the operation
+// along with the final results. Must be called after using the ObjectWriter.
+//
+// Exactly one return value is non-nil. By default, server status is returned in res structure.
+// Any client's internal or transport errors are returned as Go built-in error.
+// If Client is tuned to resolve NeoFS API statuses, then NeoFS failures
+// codes are returned as error.
+//
+// Return statuses:
+//   global (see Client docs).
+func (x *ObjectWriter) Close() (*ResObjectPut, error) {
+	defer x.cancelCtxStream()
+
+	if x.ctxCall.err != nil {
+		return nil, x.ctxCall.err
+	}
+
+	if !x.ctxCall.close() {
+		return nil, x.ctxCall.err
+	}
+
+	if !x.ctxCall.processResponse() {
+		return nil, x.ctxCall.err
+	}
+
+	return x.ctxCall.statusRes.(*ResObjectPut), nil
+}
+
+// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
+//
+// The call only opens the transmission channel, explicit recording is done using the ObjectWriter.
+// Exactly one return value is non-nil. Resulting writer must be finally closed.
+//
+// Context is required and must not be nil. It is used for network communication.
+func (c *Client) ObjectPutInit(ctx context.Context, _ PrmObjectPutInit) (*ObjectWriter, error) {
+	// check parameters
+	if ctx == nil {
+		panic(panicMsgMissingContext)
+	}
+
+	// open stream
+	var (
+		res ResObjectPut
+		w   ObjectWriter
+	)
+
+	ctx, w.cancelCtxStream = context.WithCancel(ctx)
+
+	stream, err := rpcapi.PutObject(c.Raw(), &res.resp, client.WithContext(ctx))
+	if err != nil {
+		return nil, fmt.Errorf("open stream: %w", err)
+	}
+
+	// form request body
+	var body v2object.PutRequestBody
+
+	// form request
+	var req v2object.PutRequest
+
+	req.SetBody(&body)
+
+	req.SetMetaHeader(&w.metaHdr)
+	body.SetObjectPart(&w.partInit)
+
+	// init call context
+	c.initCallContext(&w.ctxCall)
+	w.ctxCall.req = &req
+	w.ctxCall.statusRes = &res
+	w.ctxCall.resp = &res.resp
+	w.ctxCall.wReq = func() error {
+		return stream.Write(&req)
+	}
+	w.ctxCall.closer = stream.Close
+
+	return &w, nil
+}
diff --git a/go.mod b/go.mod
index 50dc751b..81e062c0 100644
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,6 @@ require (
 	github.com/nspcc-dev/hrw v1.0.9
 	github.com/nspcc-dev/neo-go v0.98.0
 	github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5
-	github.com/nspcc-dev/neofs-crypto v0.3.0
 	github.com/stretchr/testify v1.7.0
 	go.uber.org/zap v1.18.1
 	google.golang.org/grpc v1.41.0
diff --git a/pool/mock_test.go b/pool/mock_test.go
index 7918531f..af0b461d 100644
--- a/pool/mock_test.go
+++ b/pool/mock_test.go
@@ -224,15 +224,11 @@ func (mr *MockClientMockRecorder) GetContainer(arg0, arg1 interface{}) *gomock.C
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContainer", reflect.TypeOf((*MockClient)(nil).GetContainer), varargs...)
 }
 
-// GetObject mocks base method.
-func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectParams, arg2 ...client0.CallOption) (*client0.ObjectGetRes, error) {
+// ObjectGetInitmocks base method.
+func (m *MockClient) ObjectGetInit(arg0 context.Context, arg1 client0.PrmObjectGet) (*client0.ObjectReader, error) {
 	m.ctrl.T.Helper()
-	varargs := []interface{}{arg0, arg1}
-	for _, a := range arg2 {
-		varargs = append(varargs, a)
-	}
-	ret := m.ctrl.Call(m, "GetObject", varargs...)
-	ret0, _ := ret[0].(*client0.ObjectGetRes)
+	ret := m.ctrl.Call(m, "GetObject", arg0, arg1)
+	ret0, _ := ret[0].(*client0.ObjectReader)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
@@ -241,7 +237,7 @@ func (m *MockClient) GetObject(arg0 context.Context, arg1 *client0.GetObjectPara
 func (mr *MockClientMockRecorder) GetObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
 	varargs := append([]interface{}{arg0, arg1}, arg2...)
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockClient)(nil).GetObject), varargs...)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectGetInit", reflect.TypeOf((*MockClient)(nil).ObjectGetInit), varargs...)
 }
 
 // HashObjectPayloadRanges mocks base method.
@@ -355,24 +351,20 @@ func (mr *MockClientMockRecorder) PutContainer(arg0, arg1 interface{}) *gomock.C
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutContainer", reflect.TypeOf((*MockClient)(nil).PutContainer), varargs...)
 }
 
-// PutObject mocks base method.
-func (m *MockClient) PutObject(arg0 context.Context, arg1 *client0.PutObjectParams, arg2 ...client0.CallOption) (*client0.ObjectPutRes, error) {
+// ObjectPutInitmocks base method.
+func (m *MockClient) ObjectPutInit(arg0 context.Context, arg1 client0.PrmObjectPutInit) (*client0.ObjectWriter, error) {
 	m.ctrl.T.Helper()
-	varargs := []interface{}{arg0, arg1}
-	for _, a := range arg2 {
-		varargs = append(varargs, a)
-	}
-	ret := m.ctrl.Call(m, "PutObject", varargs...)
-	ret0, _ := ret[0].(*client0.ObjectPutRes)
+	ret := m.ctrl.Call(m, "PutObject", arg0)
+	ret0, _ := ret[0].(*client0.ObjectWriter)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
 
 // PutObject indicates an expected call of PutObject.
-func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
+func (mr *MockClientMockRecorder) PutObject(arg0, arg1 interface{}) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	varargs := append([]interface{}{arg0, arg1}, arg2...)
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockClient)(nil).PutObject), varargs...)
+	varargs := append([]interface{}{arg0, arg1})
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectPutInit", reflect.TypeOf((*MockClient)(nil).ObjectPutInit), varargs...)
 }
 
 // Raw mocks base method.
diff --git a/pool/pool.go b/pool/pool.go
index 22a4b7d8..3fe99248 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -1,6 +1,7 @@
 package pool
 
 import (
+	"bytes"
 	"context"
 	"crypto/ecdsa"
 	"crypto/sha256"
@@ -22,6 +23,7 @@ import (
 	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
 	"github.com/nspcc-dev/neofs-sdk-go/eacl"
 	"github.com/nspcc-dev/neofs-sdk-go/object"
+	"github.com/nspcc-dev/neofs-sdk-go/object/address"
 	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
 	"github.com/nspcc-dev/neofs-sdk-go/owner"
 	"github.com/nspcc-dev/neofs-sdk-go/session"
@@ -41,9 +43,9 @@ type Client interface {
 	AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error)
 	EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error)
 	NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error)
-	PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error)
+	ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error)
 	DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error)
-	GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error)
+	ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error)
 	HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error)
 	ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error)
 	HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error)
@@ -159,9 +161,9 @@ type Pool interface {
 }
 
 type Object interface {
-	PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error)
+	PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error)
 	DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error
-	GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error)
+	GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error)
 	GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error)
 	ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error)
 	ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error)
@@ -282,9 +284,9 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
 
 	for i, params := range options.nodesParams {
 		clientPacks := make([]*clientPack, len(params.weights))
-		for j, address := range params.addresses {
+		for j, addr := range params.addresses {
 			c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
-				client.WithURIAddress(address, nil),
+				client.WithURIAddress(addr, nil),
 				client.WithDialTimeout(options.NodeConnectionTimeout),
 				client.WithNeoFSErrorParsing())
 			if err != nil {
@@ -294,14 +296,14 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
 			cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration)
 			if err != nil && options.Logger != nil {
 				options.Logger.Warn("failed to create neofs session token for client",
-					zap.String("address", address),
+					zap.String("address", addr),
 					zap.Error(err))
 			} else if err == nil {
 				healthy, atLeastOneHealthy = true, true
 				st := sessionTokenForOwner(ownerID, cliRes)
-				_ = cache.Put(formCacheKey(address, options.Key), st)
+				_ = cache.Put(formCacheKey(addr, options.Key), st)
 			}
-			clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
+			clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
 		}
 		source := rand.NewSource(time.Now().UnixNano())
 		sampler := NewSampler(params.weights, source)
@@ -448,8 +450,8 @@ func (p *pool) Connection() (Client, *session.Token, error) {
 		return nil, nil, err
 	}
 
-	token := p.cache.Get(formCacheKey(cp.address, p.key))
-	return cp.client, token, nil
+	tok := p.cache.Get(formCacheKey(cp.address, p.key))
+	return cp.client, tok, nil
 }
 
 func (p *pool) connection() (*clientPack, error) {
@@ -585,7 +587,68 @@ func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error {
 	return nil
 }
 
-func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) {
+type callContext struct {
+	// context for RPC
+	ctxBase context.Context
+
+	client Client
+
+	// client endpoint
+	endpoint string
+
+	// request signer
+	key *ecdsa.PrivateKey
+
+	// flag to open default session if session token is missing
+	sessionDefault bool
+	sessionToken   *session.Token
+}
+
+func (p *pool) prepareCallContext(ctx *callContext, cfg *callConfig) error {
+	cp, err := p.connection()
+	if err != nil {
+		return err
+	}
+
+	ctx.endpoint = cp.address
+	ctx.client = cp.client
+
+	ctx.key = cfg.key
+	if ctx.key == nil {
+		ctx.key = p.key
+	}
+
+	ctx.sessionDefault = cfg.useDefaultSession
+	ctx.sessionToken = cfg.stoken
+
+	if ctx.sessionToken == nil && ctx.sessionDefault {
+		cacheKey := formCacheKey(ctx.endpoint, ctx.key)
+
+		ctx.sessionToken = p.cache.Get(cacheKey)
+		if ctx.sessionToken == nil {
+			var cliPrm client.CreateSessionPrm
+
+			cliPrm.SetExp(math.MaxUint32)
+
+			cliRes, err := ctx.client.CreateSession(ctx.ctxBase, cliPrm)
+			if err != nil {
+				return fmt.Errorf("default session: %w", err)
+			}
+
+			ctx.sessionToken = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes)
+
+			_ = p.cache.Put(cacheKey, ctx.sessionToken)
+		}
+	}
+
+	if ctx.sessionToken != nil && ctx.sessionToken.Signature() == nil {
+		err = ctx.sessionToken.Sign(ctx.key)
+	}
+
+	return err
+}
+
+func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) {
 	cfg := cfgFromOpts(append(opts, useDefaultSession())...)
 
 	// Put object is different from other object service methods. Put request
@@ -603,21 +666,90 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op
 		return nil, err
 	}
 
-	cp, options, err := p.conn(ctx, cfg)
+	var ctxCall callContext
+
+	ctxCall.ctxBase = ctx
+
+	err = p.prepareCallContext(&ctxCall, cfg)
 	if err != nil {
 		return nil, err
 	}
 
-	res, err := cp.client.PutObject(ctx, params, options...)
+	var prm client.PrmObjectPutInit
 
-	// removes session token from cache in case of token error
-	_ = p.checkSessionTokenErr(err, cp.address)
-
-	if err != nil { // here err already carries both status and client errors
-		return nil, err
+	wObj, err := ctxCall.client.ObjectPutInit(ctx, prm)
+	if err != nil {
+		return nil, fmt.Errorf("init writing on API client: %w", err)
 	}
 
-	return res.ID(), nil
+	wObj.UseKey(*ctxCall.key)
+
+	if ctxCall.sessionToken != nil {
+		wObj.WithinSession(*ctxCall.sessionToken)
+	}
+
+	if cfg.btoken != nil {
+		wObj.WithBearerToken(*cfg.btoken)
+	}
+
+	if wObj.WriteHeader(hdr) {
+		sz := hdr.PayloadSize()
+
+		if data := hdr.Payload(); len(data) > 0 {
+			if payload != nil {
+				payload = io.MultiReader(bytes.NewReader(data), payload)
+			} else {
+				payload = bytes.NewReader(data)
+				sz = uint64(len(data))
+			}
+		}
+
+		if payload != nil {
+			const defaultBufferSizePut = 4096 // configure?
+
+			if sz == 0 || sz > defaultBufferSizePut {
+				sz = defaultBufferSizePut
+			}
+
+			buf := make([]byte, sz)
+
+			var n int
+			var ok bool
+
+			for {
+				n, err = payload.Read(buf)
+				if n > 0 {
+					ok = wObj.WritePayloadChunk(buf[:n])
+					if !ok {
+						break
+					}
+
+					continue
+				}
+
+				if errors.Is(err, io.EOF) {
+					break
+				}
+
+				return nil, fmt.Errorf("read payload: %w", err)
+			}
+		}
+	}
+
+	res, err := wObj.Close()
+	if err != nil { // here err already carries both status and client errors
+		// removes session token from cache in case of token error
+		p.checkSessionTokenErr(err, ctxCall.endpoint)
+		return nil, fmt.Errorf("client failure: %w", err)
+	}
+
+	var id oid.ID
+
+	if !res.ReadStoredObject(&id) {
+		return nil, errors.New("missing ID of the stored object")
+	}
+
+	return &id, nil
 }
 
 func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
@@ -639,25 +771,74 @@ func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectPara
 	return err
 }
 
-func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) {
+type objectReadCloser client.ObjectReader
+
+func (x *objectReadCloser) Read(p []byte) (int, error) {
+	return (*client.ObjectReader)(x).Read(p)
+}
+
+func (x *objectReadCloser) Close() error {
+	_, err := (*client.ObjectReader)(x).Close()
+	return err
+}
+
+type ResGetObject struct {
+	Header object.Object
+
+	Payload io.ReadCloser
+}
+
+func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) {
 	cfg := cfgFromOpts(append(opts, useDefaultSession())...)
-	cp, options, err := p.conn(ctx, cfg)
+
+	var ctxCall callContext
+
+	ctxCall.ctxBase = ctx
+
+	err := p.prepareCallContext(&ctxCall, cfg)
 	if err != nil {
 		return nil, err
 	}
 
-	res, err := cp.client.GetObject(ctx, params, options...)
+	var prm client.PrmObjectGet
 
-	if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
-		opts = append(opts, retry())
-		return p.GetObject(ctx, params, opts...)
+	if cnr := addr.ContainerID(); cnr != nil {
+		prm.FromContainer(*cnr)
 	}
 
-	if err != nil { // here err already carries both status and client errors
+	if obj := addr.ObjectID(); obj != nil {
+		prm.ByID(*obj)
+	}
+
+	if ctxCall.sessionToken != nil {
+		prm.WithinSession(*ctxCall.sessionToken)
+	}
+
+	if cfg.btoken != nil {
+		prm.WithBearerToken(*cfg.btoken)
+	}
+
+	rObj, err := ctxCall.client.ObjectGetInit(ctx, prm)
+	if err != nil {
 		return nil, err
 	}
 
-	return res.Object(), nil
+	rObj.UseKey(*ctxCall.key)
+
+	var res ResGetObject
+
+	if !rObj.ReadHeader(&res.Header) {
+		_, err = rObj.Close()
+		if p.checkSessionTokenErr(err, ctxCall.endpoint) && !cfg.isRetry {
+			return p.GetObject(ctx, addr, append(opts, retry())...)
+		}
+
+		return nil, fmt.Errorf("read header: %w", err)
+	}
+
+	res.Payload = (*objectReadCloser)(rObj)
+
+	return &res, nil
 }
 
 func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
diff --git a/pool/pool_test.go b/pool/pool_test.go
index ba792c70..969bdde2 100644
--- a/pool/pool_test.go
+++ b/pool/pool_test.go
@@ -15,6 +15,8 @@ import (
 	"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
 	"github.com/nspcc-dev/neofs-sdk-go/client"
 	"github.com/nspcc-dev/neofs-sdk-go/netmap"
+	"github.com/nspcc-dev/neofs-sdk-go/object"
+	"github.com/nspcc-dev/neofs-sdk-go/object/address"
 	"github.com/nspcc-dev/neofs-sdk-go/owner"
 	"github.com/nspcc-dev/neofs-sdk-go/session"
 	"github.com/stretchr/testify/require"
@@ -329,7 +331,7 @@ func TestSessionCache(t *testing.T) {
 		}).MaxTimes(3)
 
 		mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist"))
-		mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+		mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any()).Return(nil, nil)
 
 		return mockClient, nil
 	}
@@ -355,7 +357,7 @@ func TestSessionCache(t *testing.T) {
 	require.NoError(t, err)
 	require.Contains(t, tokens, st)
 
-	_, err = pool.GetObject(ctx, nil, retry())
+	_, err = pool.GetObject(ctx, address.Address{}, retry())
 	require.Error(t, err)
 
 	// cache must not contain session token
@@ -363,7 +365,7 @@ func TestSessionCache(t *testing.T) {
 	require.NoError(t, err)
 	require.Nil(t, st)
 
-	_, err = pool.PutObject(ctx, nil)
+	_, err = pool.PutObject(ctx, object.Object{}, nil)
 	require.NoError(t, err)
 
 	// cache must contain session token
@@ -481,7 +483,7 @@ func TestSessionCacheWithKey(t *testing.T) {
 	require.NoError(t, err)
 	require.Contains(t, tokens, st)
 
-	_, err = pool.GetObject(ctx, nil, WithKey(newPrivateKey(t)))
+	_, err = pool.GetObject(ctx, address.Address{}, WithKey(newPrivateKey(t)))
 	require.NoError(t, err)
 	require.Len(t, tokens, 2)
 }