package client

import (
	"bytes"
	"context"
	"crypto/ecdsa"
	"errors"
	"fmt"
	"io"

	v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
	v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
	rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
	"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
	v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
	"github.com/nspcc-dev/neofs-api-go/v2/signature"
	apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
	"github.com/nspcc-dev/neofs-sdk-go/object"
	signer "github.com/nspcc-dev/neofs-sdk-go/util/signature"
)

type PutObjectParams struct {
	obj *object.Object

	r io.Reader
}

// ObjectAddressWriter is an interface of the
// component that writes the object address.
type ObjectAddressWriter interface {
	SetAddress(*object.Address)
}

type DeleteObjectParams struct {
	addr *object.Address

	tombTgt ObjectAddressWriter
}

type GetObjectParams struct {
	addr *object.Address

	raw bool

	w io.Writer

	readerHandler ReaderHandler
}

type ObjectHeaderParams struct {
	addr *object.Address

	raw bool

	short bool
}

type RangeDataParams struct {
	addr *object.Address

	raw bool

	r *object.Range

	w io.Writer
}

type RangeChecksumParams struct {
	tz bool

	addr *object.Address

	rs []*object.Range

	salt []byte
}

type SearchObjectParams struct {
	cid *cid.ID

	filters object.SearchFilters
}

type putObjectV2Reader struct {
	r io.Reader
}

type putObjectV2Writer struct {
	key *ecdsa.PrivateKey

	chunkPart *v2object.PutObjectPartChunk

	req *v2object.PutRequest

	stream *rpcapi.PutRequestWriter
}

type checksumType int

const (
	_ checksumType = iota
	checksumSHA256
	checksumTZ
)

const chunkSize = 3 * (1 << 20)

const TZSize = 64

const searchQueryVersion uint32 = 1

func rangesToV2(rs []*object.Range) []*v2object.Range {
	r2 := make([]*v2object.Range, 0, len(rs))

	for i := range rs {
		r2 = append(r2, rs[i].ToV2())
	}

	return r2
}

func (t checksumType) toV2() v2refs.ChecksumType {
	switch t {
	case checksumSHA256:
		return v2refs.SHA256
	case checksumTZ:
		return v2refs.TillichZemor
	default:
		panic(fmt.Sprintf("invalid checksum type %d", t))
	}
}

func (w *putObjectV2Reader) Read(p []byte) (int, error) {
	return w.r.Read(p)
}

func (w *putObjectV2Writer) Write(p []byte) (int, error) {
	w.chunkPart.SetChunk(p)

	w.req.SetVerificationHeader(nil)

	if err := signature.SignServiceMessage(w.key, w.req); err != nil {
		return 0, fmt.Errorf("could not sign chunk request message: %w", err)
	}

	if err := w.stream.Write(w.req); err != nil {
		return 0, fmt.Errorf("could not send chunk request message: %w", err)
	}

	return len(p), nil
}

func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams {
	if p != nil {
		p.obj = v
	}

	return p
}

func (p *PutObjectParams) Object() *object.Object {
	if p != nil {
		return p.obj
	}

	return nil
}

func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams {
	if p != nil {
		p.r = v
	}

	return p
}

func (p *PutObjectParams) PayloadReader() io.Reader {
	if p != nil {
		return p.r
	}

	return nil
}

type ObjectPutRes struct {
	statusRes

	id *object.ID
}

func (x *ObjectPutRes) setID(id *object.ID) {
	x.id = id
}

func (x ObjectPutRes) ID() *object.ID {
	return x.id
}

// PutObject puts object through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*ObjectPutRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.PutRequest)

	// initialize request body
	body := new(v2object.PutRequestBody)
	req.SetBody(body)

	v2Addr := new(v2refs.Address)
	v2Addr.SetObjectID(p.obj.ID().ToV2())
	v2Addr.SetContainerID(p.obj.ContainerID().ToV2())

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: v2Addr,
		verb: v2session.ObjectVerbPut,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// initialize init part
	initPart := new(v2object.PutObjectPartInit)
	body.SetObjectPart(initPart)

	obj := p.obj.ToV2()

	// set init part fields
	initPart.SetObjectID(obj.GetObjectID())
	initPart.SetSignature(obj.GetSignature())
	initPart.SetHeader(obj.GetHeader())

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// open stream
	resp := new(v2object.PutResponse)

	stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("stream opening failed: %w", err)
	}

	// send init part
	err = stream.Write(req)
	if err != nil {
		return nil, fmt.Errorf("sending the initial message to stream failed: %w", err)
	}

	// create payload bytes reader
	var rPayload io.Reader = bytes.NewReader(obj.GetPayload())
	if p.r != nil {
		rPayload = io.MultiReader(rPayload, p.r)
	}

	// create v2 payload stream writer
	chunkPart := new(v2object.PutObjectPartChunk)
	body.SetObjectPart(chunkPart)

	w := &putObjectV2Writer{
		key:       callOpts.key,
		chunkPart: chunkPart,
		req:       req,
		stream:    stream,
	}

	r := &putObjectV2Reader{r: rPayload}

	// copy payload from reader to stream writer
	_, err = io.CopyBuffer(w, r, make([]byte, chunkSize))
	if err != nil && !errors.Is(err, io.EOF) {
		return nil, fmt.Errorf("payload streaming failed: %w", err)
	}

	// close object stream and receive response from remote node
	err = stream.Close()
	if err != nil {
		return nil, fmt.Errorf("closing the stream failed: %w", err)
	}

	var (
		res     = new(ObjectPutRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	// process response in general
	if c.processResponseV2(&procRes, procPrm) {
		if procRes.cliErr != nil {
			return nil, procRes.cliErr
		}

		return res, nil
	}

	// convert object identifier
	id := object.NewIDFromV2(resp.GetBody().GetObjectID())

	res.setID(id)

	return res, nil
}

func (p *DeleteObjectParams) WithAddress(v *object.Address) *DeleteObjectParams {
	if p != nil {
		p.addr = v
	}

	return p
}

func (p *DeleteObjectParams) Address() *object.Address {
	if p != nil {
		return p.addr
	}

	return nil
}

// WithTombstoneAddressTarget sets target component to write tombstone address.
func (p *DeleteObjectParams) WithTombstoneAddressTarget(v ObjectAddressWriter) *DeleteObjectParams {
	if p != nil {
		p.tombTgt = v
	}

	return p
}

// TombstoneAddressTarget returns target component to write tombstone address.
func (p *DeleteObjectParams) TombstoneAddressTarget() ObjectAddressWriter {
	if p != nil {
		return p.tombTgt
	}

	return nil
}

type ObjectDeleteRes struct {
	statusRes

	tombAddr *object.Address
}

func (x ObjectDeleteRes) TombstoneAddress() *object.Address {
	return x.tombAddr
}

func (x *ObjectDeleteRes) setTombstoneAddress(addr *object.Address) {
	x.tombAddr = addr
}

// DeleteObject removes object by address.
//
// If target of tombstone address is not set, the address is ignored.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*ObjectDeleteRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.DeleteRequest)

	// initialize request body
	body := new(v2object.DeleteRequestBody)
	req.SetBody(body)

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: p.addr.ToV2(),
		verb: v2session.ObjectVerbDelete,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetAddress(p.addr.ToV2())

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// send request
	resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("sending the request failed: %w", err)
	}

	var (
		res     = new(ObjectDeleteRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	// process response in general
	if c.processResponseV2(&procRes, procPrm) {
		if procRes.cliErr != nil {
			return nil, procRes.cliErr
		}

		return res, nil
	}

	addrv2 := resp.GetBody().GetTombstone()

	res.setTombstoneAddress(object.NewAddressFromV2(addrv2))

	return res, nil
}

func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams {
	if p != nil {
		p.addr = v
	}

	return p
}

func (p *GetObjectParams) Address() *object.Address {
	if p != nil {
		return p.addr
	}

	return nil
}

func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams {
	if p != nil {
		p.w = w
	}

	return p
}

func (p *GetObjectParams) PayloadWriter() io.Writer {
	if p != nil {
		return p.w
	}

	return nil
}

func (p *GetObjectParams) WithRawFlag(v bool) *GetObjectParams {
	if p != nil {
		p.raw = v
	}

	return p
}

func (p *GetObjectParams) RawFlag() bool {
	if p != nil {
		return p.raw
	}

	return false
}

// ReaderHandler is a function over io.Reader.
type ReaderHandler func(io.Reader)

// WithPayloadReaderHandler sets handler of the payload reader.
//
// If provided, payload reader is composed after receiving the header.
// In this case payload writer set via WithPayloadWriter is ignored.
//
// Handler should not be nil.
func (p *GetObjectParams) WithPayloadReaderHandler(f ReaderHandler) *GetObjectParams {
	if p != nil {
		p.readerHandler = f
	}

	return p
}

// wrapper over the Object Get stream that provides io.Reader.
type objectPayloadReader struct {
	stream interface {
		Read(*v2object.GetResponse) error
	}

	resp v2object.GetResponse

	tail []byte
}

func (x *objectPayloadReader) Read(p []byte) (read int, err error) {
	// read remaining tail
	read = copy(p, x.tail)

	x.tail = x.tail[read:]

	if len(p)-read == 0 {
		return
	}

	// receive message from server stream
	err = x.stream.Read(&x.resp)
	if err != nil {
		if errors.Is(err, io.EOF) {
			err = io.EOF
			return
		}

		err = fmt.Errorf("reading the response failed: %w", err)
		return
	}

	// get chunk part message
	part := x.resp.GetBody().GetObjectPart()

	chunkPart, ok := part.(*v2object.GetObjectPartChunk)
	if !ok {
		err = errWrongMessageSeq
		return
	}

	// verify response structure
	if err = signature.VerifyServiceMessage(&x.resp); err != nil {
		err = fmt.Errorf("response verification failed: %w", err)
		return
	}

	// read new chunk
	chunk := chunkPart.GetChunk()

	tailOffset := copy(p[read:], chunk)

	read += tailOffset

	// save the tail
	x.tail = append(x.tail, chunk[tailOffset:]...)

	return
}

var errWrongMessageSeq = errors.New("incorrect message sequence")

type ObjectGetRes struct {
	statusRes
	objectRes
}

type objectRes struct {
	obj *object.Object
}

func (x *objectRes) setObject(obj *object.Object) {
	x.obj = obj
}

func (x objectRes) Object() *object.Object {
	return x.obj
}

func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) {
	var st apistatus.ServerInternal // specific API status should be used

	apistatus.WriteInternalServerErr(&st, fmt.Errorf("unexpected message type %T", val))

	res.setStatus(st)
}

// GetObject receives object through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*ObjectGetRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.GetRequest)

	// initialize request body
	body := new(v2object.GetRequestBody)
	req.SetBody(body)

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: p.addr.ToV2(),
		verb: v2session.ObjectVerbGet,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetAddress(p.addr.ToV2())
	body.SetRaw(p.raw)

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// open stream
	stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("stream opening failed: %w", err)
	}

	var (
		headWas bool
		payload []byte
		obj     = new(v2object.Object)
		resp    = new(v2object.GetResponse)

		messageWas bool

		res     = new(ObjectGetRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

loop:
	for {
		// receive message from server stream
		err := stream.Read(resp)
		if err != nil {
			if errors.Is(err, io.EOF) {
				if !messageWas {
					return nil, errWrongMessageSeq
				}

				break
			}

			return nil, fmt.Errorf("reading the response failed: %w", err)
		}

		messageWas = true

		// process response in general
		if c.processResponseV2(&procRes, procPrm) {
			if procRes.cliErr != nil {
				return nil, procRes.cliErr
			}

			return res, nil
		}

		switch v := resp.GetBody().GetObjectPart().(type) {
		default:
			return nil, errWrongMessageSeq
		case *v2object.GetObjectPartInit:
			if headWas {
				return nil, errWrongMessageSeq
			}

			headWas = true

			obj.SetObjectID(v.GetObjectID())
			obj.SetSignature(v.GetSignature())

			hdr := v.GetHeader()
			obj.SetHeader(hdr)

			if p.readerHandler != nil {
				p.readerHandler(&objectPayloadReader{
					stream: stream,
				})

				break loop
			}

			if p.w == nil {
				payload = make([]byte, 0, hdr.GetPayloadLength())
			}
		case *v2object.GetObjectPartChunk:
			if !headWas {
				return nil, errWrongMessageSeq
			}

			if p.w != nil {
				if _, err := p.w.Write(v.GetChunk()); err != nil {
					return nil, fmt.Errorf("could not write payload chunk: %w", err)
				}
			} else {
				payload = append(payload, v.GetChunk()...)
			}
		case *v2object.SplitInfo:
			if headWas {
				return nil, errWrongMessageSeq
			}

			si := object.NewSplitInfoFromV2(v)
			return nil, object.NewSplitInfoError(si)
		}
	}

	obj.SetPayload(payload)

	// convert the object
	res.setObject(object.NewFromV2(obj))

	return res, nil
}

func (p *ObjectHeaderParams) WithAddress(v *object.Address) *ObjectHeaderParams {
	if p != nil {
		p.addr = v
	}

	return p
}

func (p *ObjectHeaderParams) Address() *object.Address {
	if p != nil {
		return p.addr
	}

	return nil
}

func (p *ObjectHeaderParams) WithAllFields() *ObjectHeaderParams {
	if p != nil {
		p.short = false
	}

	return p
}

// AllFields return true if parameter set to return all header fields, returns
// false if parameter set to return only main fields of header.
func (p *ObjectHeaderParams) AllFields() bool {
	if p != nil {
		return !p.short
	}

	return false
}

func (p *ObjectHeaderParams) WithMainFields() *ObjectHeaderParams {
	if p != nil {
		p.short = true
	}

	return p
}

func (p *ObjectHeaderParams) WithRawFlag(v bool) *ObjectHeaderParams {
	if p != nil {
		p.raw = v
	}

	return p
}

func (p *ObjectHeaderParams) RawFlag() bool {
	if p != nil {
		return p.raw
	}

	return false
}

type ObjectHeadRes struct {
	statusRes
	objectRes
}

// HeadObject receives object's header through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) HeadObject(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*ObjectHeadRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.HeadRequest)

	// initialize request body
	body := new(v2object.HeadRequestBody)
	req.SetBody(body)

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: p.addr.ToV2(),
		verb: v2session.ObjectVerbHead,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetAddress(p.addr.ToV2())
	body.SetMainOnly(p.short)
	body.SetRaw(p.raw)

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// send Head request
	resp, err := rpcapi.HeadObject(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("sending the request failed: %w", err)
	}

	var (
		res     = new(ObjectHeadRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	// process response in general
	if c.processResponseV2(&procRes, procPrm) {
		if procRes.cliErr != nil {
			return nil, procRes.cliErr
		}

		return res, nil
	}

	var (
		hdr   *v2object.Header
		idSig *v2refs.Signature
	)

	switch v := resp.GetBody().GetHeaderPart().(type) {
	case nil:
		writeUnexpectedMessageTypeErr(res, v)
		return res, nil
	case *v2object.ShortHeader:
		if !p.short {
			writeUnexpectedMessageTypeErr(res, v)
			return res, nil
		}

		h := v

		hdr = new(v2object.Header)
		hdr.SetPayloadLength(h.GetPayloadLength())
		hdr.SetVersion(h.GetVersion())
		hdr.SetOwnerID(h.GetOwnerID())
		hdr.SetObjectType(h.GetObjectType())
		hdr.SetCreationEpoch(h.GetCreationEpoch())
		hdr.SetPayloadHash(h.GetPayloadHash())
		hdr.SetHomomorphicHash(h.GetHomomorphicHash())
	case *v2object.HeaderWithSignature:
		if p.short {
			writeUnexpectedMessageTypeErr(res, v)
			return res, nil
		}

		hdr = v.GetHeader()
		idSig = v.GetSignature()
	case *v2object.SplitInfo:
		si := object.NewSplitInfoFromV2(v)

		return nil, object.NewSplitInfoError(si)
	}

	obj := new(v2object.Object)
	obj.SetHeader(hdr)
	obj.SetSignature(idSig)

	raw := object.NewRawFromV2(obj)
	raw.SetID(p.addr.ObjectID())

	res.setObject(raw.Object())

	return res, nil
}

func (p *RangeDataParams) WithAddress(v *object.Address) *RangeDataParams {
	if p != nil {
		p.addr = v
	}

	return p
}

func (p *RangeDataParams) Address() *object.Address {
	if p != nil {
		return p.addr
	}

	return nil
}

func (p *RangeDataParams) WithRaw(v bool) *RangeDataParams {
	if p != nil {
		p.raw = v
	}

	return p
}

func (p *RangeDataParams) Raw() bool {
	if p != nil {
		return p.raw
	}

	return false
}

func (p *RangeDataParams) WithRange(v *object.Range) *RangeDataParams {
	if p != nil {
		p.r = v
	}

	return p
}

func (p *RangeDataParams) Range() *object.Range {
	if p != nil {
		return p.r
	}

	return nil
}

func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams {
	if p != nil {
		p.w = v
	}

	return p
}

func (p *RangeDataParams) DataWriter() io.Writer {
	if p != nil {
		return p.w
	}

	return nil
}

type ObjectRangeRes struct {
	statusRes

	data []byte
}

func (x *ObjectRangeRes) setData(data []byte) {
	x.data = data
}

func (x ObjectRangeRes) Data() []byte {
	return x.data
}

// ObjectPayloadRangeData receives object's range payload data through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) (*ObjectRangeRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.GetRangeRequest)

	// initialize request body
	body := new(v2object.GetRangeRequestBody)
	req.SetBody(body)

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: p.addr.ToV2(),
		verb: v2session.ObjectVerbRange,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetAddress(p.addr.ToV2())
	body.SetRange(p.r.ToV2())
	body.SetRaw(p.raw)

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// open stream
	stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
	}

	var payload []byte
	if p.w != nil {
		payload = make([]byte, 0, p.r.GetLength())
	}

	var (
		resp = new(v2object.GetRangeResponse)

		chunkWas, messageWas bool

		res     = new(ObjectRangeRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	for {
		// receive message from server stream
		err := stream.Read(resp)
		if err != nil {
			if errors.Is(err, io.EOF) {
				if !messageWas {
					return nil, errWrongMessageSeq
				}

				break
			}

			return nil, fmt.Errorf("reading the response failed: %w", err)
		}

		messageWas = true

		// process response in general
		if c.processResponseV2(&procRes, procPrm) {
			if procRes.cliErr != nil {
				return nil, procRes.cliErr
			}

			return res, nil
		}

		switch v := resp.GetBody().GetRangePart().(type) {
		case nil:
			writeUnexpectedMessageTypeErr(res, v)
			return res, nil
		case *v2object.GetRangePartChunk:
			chunkWas = true

			if p.w != nil {
				if _, err = p.w.Write(v.GetChunk()); err != nil {
					return nil, fmt.Errorf("could not write payload chunk: %w", err)
				}
			} else {
				payload = append(payload, v.GetChunk()...)
			}
		case *v2object.SplitInfo:
			if chunkWas {
				return nil, errWrongMessageSeq
			}

			si := object.NewSplitInfoFromV2(v)

			return nil, object.NewSplitInfoError(si)
		}
	}

	res.setData(payload)

	return res, nil
}

func (p *RangeChecksumParams) WithAddress(v *object.Address) *RangeChecksumParams {
	if p != nil {
		p.addr = v
	}

	return p
}

func (p *RangeChecksumParams) Address() *object.Address {
	if p != nil {
		return p.addr
	}

	return nil
}

func (p *RangeChecksumParams) WithRangeList(rs ...*object.Range) *RangeChecksumParams {
	if p != nil {
		p.rs = rs
	}

	return p
}

func (p *RangeChecksumParams) RangeList() []*object.Range {
	if p != nil {
		return p.rs
	}

	return nil
}

func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams {
	if p != nil {
		p.salt = v
	}

	return p
}

func (p *RangeChecksumParams) Salt() []byte {
	if p != nil {
		return p.salt
	}

	return nil
}

func (p *RangeChecksumParams) TZ() *RangeChecksumParams {
	p.tz = true
	return p
}

type ObjectRangeHashRes struct {
	statusRes

	hashes [][]byte
}

func (x *ObjectRangeHashRes) setHashes(v [][]byte) {
	x.hashes = v
}

func (x ObjectRangeHashRes) Hashes() [][]byte {
	return x.hashes
}

// HashObjectPayloadRanges receives range hash of the object
// payload data through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) HashObjectPayloadRanges(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (*ObjectRangeHashRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.GetRangeHashRequest)

	// initialize request body
	body := new(v2object.GetRangeHashRequestBody)
	req.SetBody(body)

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: p.addr.ToV2(),
		verb: v2session.ObjectVerbRangeHash,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetAddress(p.addr.ToV2())
	body.SetSalt(p.salt)

	typ := checksumSHA256
	if p.tz {
		typ = checksumTZ
	}

	typV2 := typ.toV2()
	body.SetType(typV2)

	rsV2 := rangesToV2(p.rs)
	body.SetRanges(rsV2)

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// send request
	resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("sending the request failed: %w", err)
	}

	var (
		res     = new(ObjectRangeHashRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	// process response in general
	if c.processResponseV2(&procRes, procPrm) {
		if procRes.cliErr != nil {
			return nil, procRes.cliErr
		}

		return res, nil
	}

	res.setHashes(resp.GetBody().GetHashList())

	return res, nil
}

func (p *SearchObjectParams) WithContainerID(v *cid.ID) *SearchObjectParams {
	if p != nil {
		p.cid = v
	}

	return p
}

func (p *SearchObjectParams) ContainerID() *cid.ID {
	if p != nil {
		return p.cid
	}

	return nil
}

func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams {
	if p != nil {
		p.filters = v
	}

	return p
}

func (p *SearchObjectParams) SearchFilters() object.SearchFilters {
	if p != nil {
		return p.filters
	}

	return nil
}

type ObjectSearchRes struct {
	statusRes

	ids []*object.ID
}

func (x *ObjectSearchRes) setIDList(v []*object.ID) {
	x.ids = v
}

func (x ObjectSearchRes) IDList() []*object.ID {
	return x.ids
}

// SearchObjects searches for the objects through NeoFS API call.
//
// Any client's internal or transport errors are returned as `error`.
// If WithNeoFSErrorParsing option has been provided, unsuccessful
// NeoFS status codes are returned as `error`, otherwise, are included
// in the returned result structure.
func (c *Client) SearchObjects(ctx context.Context, p *SearchObjectParams, opts ...CallOption) (*ObjectSearchRes, error) {
	callOpts := c.defaultCallOptions()

	for i := range opts {
		if opts[i] != nil {
			opts[i](callOpts)
		}
	}

	// create request
	req := new(v2object.SearchRequest)

	// initialize request body
	body := new(v2object.SearchRequestBody)
	req.SetBody(body)

	v2Addr := new(v2refs.Address)
	v2Addr.SetContainerID(p.cid.ToV2())

	// set meta header
	meta := v2MetaHeaderFromOpts(callOpts)

	if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
		addr: v2Addr,
		verb: v2session.ObjectVerbSearch,
	}); err != nil {
		return nil, fmt.Errorf("could not attach session token: %w", err)
	}

	req.SetMetaHeader(meta)

	// fill body fields
	body.SetContainerID(v2Addr.GetContainerID())
	body.SetVersion(searchQueryVersion)
	body.SetFilters(p.filters.ToV2())

	// sign the request
	if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
		return nil, fmt.Errorf("signing the request failed: %w", err)
	}

	// create search stream
	stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx))
	if err != nil {
		return nil, fmt.Errorf("stream opening failed: %w", err)
	}

	var (
		searchResult []*object.ID
		resp         = new(v2object.SearchResponse)

		messageWas bool

		res     = new(ObjectSearchRes)
		procPrm processResponseV2Prm
		procRes processResponseV2Res
	)

	procPrm.callOpts = callOpts
	procPrm.resp = resp

	procRes.statusRes = res

	for {
		// receive message from server stream
		err := stream.Read(resp)
		if err != nil {
			if errors.Is(err, io.EOF) {
				if !messageWas {
					return nil, errWrongMessageSeq
				}

				break
			}

			return nil, fmt.Errorf("reading the response failed: %w", err)
		}

		messageWas = true

		// process response in general
		if c.processResponseV2(&procRes, procPrm) {
			if procRes.cliErr != nil {
				return nil, procRes.cliErr
			}

			return res, nil
		}

		chunk := resp.GetBody().GetIDList()
		for i := range chunk {
			searchResult = append(searchResult, object.NewIDFromV2(chunk[i]))
		}
	}

	res.setIDList(searchResult)

	return res, nil
}

func (c *Client) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
	if opts.session == nil {
		return nil
	}

	// Do not resign already prepared session token
	if opts.session.Signature() != nil {
		hdr.SetSessionToken(opts.session.ToV2())
		return nil
	}

	opCtx := new(v2session.ObjectSessionContext)
	opCtx.SetAddress(info.addr)
	opCtx.SetVerb(info.verb)

	lt := new(v2session.TokenLifetime)
	lt.SetIat(info.iat)
	lt.SetNbf(info.nbf)
	lt.SetExp(info.exp)

	body := new(v2session.SessionTokenBody)
	body.SetID(opts.session.ID())
	body.SetOwnerID(opts.session.OwnerID().ToV2())
	body.SetSessionKey(opts.session.SessionKey())
	body.SetContext(opCtx)
	body.SetLifetime(lt)

	token := new(v2session.SessionToken)
	token.SetBody(body)

	signWrapper := signature.StableMarshalerWrapper{SM: token.GetBody()}

	err := signer.SignDataWithHandler(opts.key, signWrapper, func(key []byte, sig []byte) {
		sessionTokenSignature := new(v2refs.Signature)
		sessionTokenSignature.SetKey(key)
		sessionTokenSignature.SetSign(sig)
		token.SetSignature(sessionTokenSignature)
	})
	if err != nil {
		return err
	}

	hdr.SetSessionToken(token)

	return nil
}