package pool

import (
	"bytes"
	"context"
	"crypto/ecdsa"
	"errors"
	"fmt"
	"io"
	"math"
	"math/rand"
	"sort"
	"sync"
	"sync/atomic"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
	sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
	"github.com/google/uuid"
	"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"google.golang.org/grpc"
)

// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
type client interface {
	// see clientWrapper.balanceGet.
	balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error)
	// see clientWrapper.containerPut.
	containerPut(context.Context, PrmContainerPut) (cid.ID, error)
	// see clientWrapper.containerGet.
	containerGet(context.Context, PrmContainerGet) (container.Container, error)
	// see clientWrapper.containerList.
	containerList(context.Context, PrmContainerList) ([]cid.ID, error)
	// see clientWrapper.containerDelete.
	containerDelete(context.Context, PrmContainerDelete) error
	// see clientWrapper.containerEACL.
	containerEACL(context.Context, PrmContainerEACL) (eacl.Table, error)
	// see clientWrapper.containerSetEACL.
	containerSetEACL(context.Context, PrmContainerSetEACL) error
	// see clientWrapper.endpointInfo.
	endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
	// see clientWrapper.networkInfo.
	networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
	// see clientWrapper.netMapSnapshot
	netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
	// see clientWrapper.objectPut.
	objectPut(context.Context, PrmObjectPut) (oid.ID, error)
	// see clientWrapper.objectDelete.
	objectDelete(context.Context, PrmObjectDelete) error
	// see clientWrapper.objectGet.
	objectGet(context.Context, PrmObjectGet) (ResGetObject, error)
	// see clientWrapper.objectHead.
	objectHead(context.Context, PrmObjectHead) (object.Object, error)
	// see clientWrapper.objectRange.
	objectRange(context.Context, PrmObjectRange) (ResObjectRange, error)
	// see clientWrapper.objectSearch.
	objectSearch(context.Context, PrmObjectSearch) (ResObjectSearch, error)
	// see clientWrapper.sessionCreate.
	sessionCreate(context.Context, prmCreateSession) (resCreateSession, error)

	clientStatus

	// see clientWrapper.dial.
	dial(ctx context.Context) error
	// see clientWrapper.restartIfUnhealthy.
	restartIfUnhealthy(ctx context.Context) (bool, bool)
	// see clientWrapper.close.
	close() error
}

// clientStatus provide access to some metrics for connection.
type clientStatus interface {
	// isHealthy checks if the connection can handle requests.
	isHealthy() bool
	// isDialed checks if the connection was created.
	isDialed() bool
	// setUnhealthy marks client as unhealthy.
	setUnhealthy()
	// address return address of endpoint.
	address() string
	// currentErrorRate returns current errors rate.
	// After specific threshold connection is considered as unhealthy.
	// Pool.startRebalance routine can make this connection healthy again.
	currentErrorRate() uint32
	// overallErrorRate returns the number of all happened errors.
	overallErrorRate() uint64
	// methodsStatus returns statistic for all used methods.
	methodsStatus() []statusSnapshot
}

// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
var errPoolClientUnhealthy = errors.New("pool client unhealthy")

// clientStatusMonitor count error rate and other statistics for connection.
type clientStatusMonitor struct {
	logger         *zap.Logger
	addr           string
	healthy        *atomic.Uint32
	errorThreshold uint32

	mu                sync.RWMutex // protect counters
	currentErrorCount uint32
	overallErrorCount uint64
	methods           []*methodStatus
}

// values for healthy status of clientStatusMonitor.
const (
	// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
	// so there is no connection to the endpoint, and pool should not close it
	// before re-establishing connection once again.
	statusUnhealthyOnDial = iota

	// statusUnhealthyOnRequest is set when communication after dialing to the
	// endpoint is failed due to immediate or accumulated errors, connection is
	// available and pool should close it before re-establishing connection once again.
	statusUnhealthyOnRequest

	// statusHealthy is set when connection is ready to be used by the pool.
	statusHealthy
)

// methodStatus provide statistic for specific method.
type methodStatus struct {
	name string
	mu   sync.RWMutex // protect counters
	statusSnapshot
}

// statusSnapshot is statistic for specific method.
type statusSnapshot struct {
	allTime     uint64
	allRequests uint64
}

// MethodIndex index of method in list of statuses in clientStatusMonitor.
type MethodIndex int

const (
	methodBalanceGet MethodIndex = iota
	methodContainerPut
	methodContainerGet
	methodContainerList
	methodContainerDelete
	methodContainerEACL
	methodContainerSetEACL
	methodEndpointInfo
	methodNetworkInfo
	methodNetMapSnapshot
	methodObjectPut
	methodObjectDelete
	methodObjectGet
	methodObjectHead
	methodObjectRange
	methodSessionCreate
	methodLast
)

// String implements fmt.Stringer.
func (m MethodIndex) String() string {
	switch m {
	case methodBalanceGet:
		return "balanceGet"
	case methodContainerPut:
		return "containerPut"
	case methodContainerGet:
		return "containerGet"
	case methodContainerList:
		return "containerList"
	case methodContainerDelete:
		return "containerDelete"
	case methodContainerEACL:
		return "containerEACL"
	case methodContainerSetEACL:
		return "containerSetEACL"
	case methodEndpointInfo:
		return "endpointInfo"
	case methodNetworkInfo:
		return "networkInfo"
	case methodNetMapSnapshot:
		return "netMapSnapshot"
	case methodObjectPut:
		return "objectPut"
	case methodObjectDelete:
		return "objectDelete"
	case methodObjectGet:
		return "objectGet"
	case methodObjectHead:
		return "objectHead"
	case methodObjectRange:
		return "objectRange"
	case methodSessionCreate:
		return "sessionCreate"
	case methodLast:
		return "it's a system name rather than a method"
	default:
		return "unknown"
	}
}

func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
	methods := make([]*methodStatus, methodLast)
	for i := methodBalanceGet; i < methodLast; i++ {
		methods[i] = &methodStatus{name: i.String()}
	}

	healthy := new(atomic.Uint32)
	healthy.Store(statusHealthy)

	return clientStatusMonitor{
		logger:         logger,
		addr:           addr,
		healthy:        healthy,
		errorThreshold: errorThreshold,
		methods:        methods,
	}
}

func (m *methodStatus) snapshot() statusSnapshot {
	m.mu.RLock()
	defer m.mu.RUnlock()
	return m.statusSnapshot
}

func (m *methodStatus) incRequests(elapsed time.Duration) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.allTime += uint64(elapsed)
	m.allRequests++
}

// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct {
	clientMutex sync.RWMutex
	client      *sdkClient.Client
	prm         wrapperPrm

	clientStatusMonitor
}

// wrapperPrm is params to create clientWrapper.
type wrapperPrm struct {
	logger                  *zap.Logger
	address                 string
	key                     ecdsa.PrivateKey
	dialTimeout             time.Duration
	streamTimeout           time.Duration
	errorThreshold          uint32
	responseInfoCallback    func(sdkClient.ResponseMetaInfo) error
	poolRequestInfoCallback func(RequestInfo)
	dialOptions             []grpc.DialOption
}

// setAddress sets endpoint to connect in FrostFS network.
func (x *wrapperPrm) setAddress(address string) {
	x.address = address
}

// setKey sets sdkClient.Client private key to be used for the protocol communication by default.
func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
	x.key = key
}

// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
	x.logger = logger
}

// setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
	x.dialTimeout = timeout
}

// setStreamTimeout sets the timeout for individual operations in streaming RPC.
func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
	x.streamTimeout = timeout
}

// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
// until Pool.startRebalance routing updates its status.
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
	x.errorThreshold = threshold
}

// setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
	x.poolRequestInfoCallback = f
}

// setResponseInfoCallback sets callback that will be invoked after every response.
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
	x.responseInfoCallback = f
}

// setGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) {
	x.dialOptions = opts
}

// newWrapper creates a clientWrapper that implements the client interface.
func newWrapper(prm wrapperPrm) *clientWrapper {
	var cl sdkClient.Client
	prmInit := sdkClient.PrmInit{
		Key:                  prm.key,
		ResponseInfoCallback: prm.responseInfoCallback,
	}

	cl.Init(prmInit)

	res := &clientWrapper{
		client:              &cl,
		clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
		prm:                 prm,
	}

	return res
}

// dial establishes a connection to the server from the FrostFS network.
// Returns an error describing failure reason. If failed, the client
// SHOULD NOT be used.
func (c *clientWrapper) dial(ctx context.Context) error {
	cl, err := c.getClient()
	if err != nil {
		return err
	}

	prmDial := sdkClient.PrmDial{
		Endpoint:        c.prm.address,
		DialTimeout:     c.prm.dialTimeout,
		StreamTimeout:   c.prm.streamTimeout,
		GRPCDialOptions: c.prm.dialOptions,
	}

	if err = cl.Dial(ctx, prmDial); err != nil {
		c.setUnhealthyOnDial()
		return err
	}

	return nil
}

// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Return current healthy status and indicating if status was changed by this function call.
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, changed bool) {
	var wasHealthy bool
	if _, err := c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
		return true, false
	} else if !errors.Is(err, errPoolClientUnhealthy) {
		wasHealthy = true
	}

	// if connection is dialed before, to avoid routine / connection leak,
	// pool has to close it and then initialize once again.
	if c.isDialed() {
		_ = c.close()
	}

	var cl sdkClient.Client
	prmInit := sdkClient.PrmInit{
		Key:                  c.prm.key,
		ResponseInfoCallback: c.prm.responseInfoCallback,
	}

	cl.Init(prmInit)

	prmDial := sdkClient.PrmDial{
		Endpoint:        c.prm.address,
		DialTimeout:     c.prm.dialTimeout,
		StreamTimeout:   c.prm.streamTimeout,
		GRPCDialOptions: c.prm.dialOptions,
	}

	if err := cl.Dial(ctx, prmDial); err != nil {
		c.setUnhealthyOnDial()
		return false, wasHealthy
	}

	c.clientMutex.Lock()
	c.client = &cl
	c.clientMutex.Unlock()

	if _, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
		c.setUnhealthy()
		return false, wasHealthy
	}

	c.setHealthy()
	return true, !wasHealthy
}

func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
	c.clientMutex.RLock()
	defer c.clientMutex.RUnlock()
	if c.isHealthy() {
		return c.client, nil
	}
	return nil, errPoolClientUnhealthy
}

// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
	cl, err := c.getClient()
	if err != nil {
		return accounting.Decimal{}, err
	}

	cliPrm := sdkClient.PrmBalanceGet{
		Account: prm.account,
	}

	start := time.Now()
	res, err := cl.BalanceGet(ctx, cliPrm)
	c.incRequests(time.Since(start), methodBalanceGet)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
	}

	return res.Amount(), nil
}

// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is.
// It also waits for the container to appear on the network.
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
	cl, err := c.getClient()
	if err != nil {
		return cid.ID{}, err
	}

	start := time.Now()
	res, err := cl.ContainerPut(ctx, prm.ClientParams)
	c.incRequests(time.Since(start), methodContainerPut)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return cid.ID{}, fmt.Errorf("container put on client: %w", err)
	}

	if prm.WaitParams == nil {
		prm.WaitParams = defaultWaitParams()
	}
	if err = prm.WaitParams.CheckValidity(); err != nil {
		return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err)
	}

	idCnr := res.ID()

	getPrm := PrmContainerGet{
		ContainerID: idCnr,
		Session:     prm.ClientParams.Session,
	}

	err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams)
	if err = c.handleError(ctx, nil, err); err != nil {
		return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
	}

	return idCnr, nil
}

// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is.
func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
	cl, err := c.getClient()
	if err != nil {
		return container.Container{}, err
	}

	cliPrm := sdkClient.PrmContainerGet{
		ContainerID: &prm.ContainerID,
		Session:     prm.Session,
	}

	start := time.Now()
	res, err := cl.ContainerGet(ctx, cliPrm)
	c.incRequests(time.Since(start), methodContainerGet)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return container.Container{}, fmt.Errorf("container get on client: %w", err)
	}

	return res.Container(), nil
}

// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
	cl, err := c.getClient()
	if err != nil {
		return nil, err
	}

	cliPrm := sdkClient.PrmContainerList{
		Account: prm.OwnerID,
		Session: prm.Session,
	}

	start := time.Now()
	res, err := cl.ContainerList(ctx, cliPrm)
	c.incRequests(time.Since(start), methodContainerList)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return nil, fmt.Errorf("container list on client: %w", err)
	}
	return res.Containers(), nil
}

// containerDelete invokes sdkClient.ContainerDelete parse response status to error.
// It also waits for the container to be removed from the network.
func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error {
	cl, err := c.getClient()
	if err != nil {
		return err
	}

	cliPrm := sdkClient.PrmContainerDelete{
		ContainerID: &prm.ContainerID,
		Session:     prm.Session,
	}

	start := time.Now()
	res, err := cl.ContainerDelete(ctx, cliPrm)
	c.incRequests(time.Since(start), methodContainerDelete)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return fmt.Errorf("container delete on client: %w", err)
	}

	if prm.WaitParams == nil {
		prm.WaitParams = defaultWaitParams()
	}
	if err := prm.WaitParams.CheckValidity(); err != nil {
		return fmt.Errorf("invalid wait parameters: %w", err)
	}

	getPrm := PrmContainerGet{
		ContainerID: prm.ContainerID,
		Session:     prm.Session,
	}

	return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams)
}

// containerEACL invokes sdkClient.ContainerEACL parse response status to error and return result as is.
func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
	cl, err := c.getClient()
	if err != nil {
		return eacl.Table{}, err
	}

	cliPrm := sdkClient.PrmContainerEACL{
		ContainerID: &prm.ContainerID,
		Session:     prm.Session,
	}

	start := time.Now()
	res, err := cl.ContainerEACL(ctx, cliPrm)
	c.incRequests(time.Since(start), methodContainerEACL)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err)
	}

	return res.Table(), nil
}

// containerSetEACL invokes sdkClient.ContainerSetEACL parse response status to error.
// It also waits for the EACL to appear on the network.
func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
	cl, err := c.getClient()
	if err != nil {
		return err
	}

	cliPrm := sdkClient.PrmContainerSetEACL{
		Table:   &prm.Table,
		Session: prm.Session,
	}

	start := time.Now()
	res, err := cl.ContainerSetEACL(ctx, cliPrm)
	c.incRequests(time.Since(start), methodContainerSetEACL)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return fmt.Errorf("set eacl on client: %w", err)
	}

	if prm.WaitParams == nil {
		prm.WaitParams = defaultWaitParams()
	}
	if err := prm.WaitParams.CheckValidity(); err != nil {
		return fmt.Errorf("invalid wait parameters: %w", err)
	}

	cnrID, _ := prm.Table.CID()
	eaclPrm := PrmContainerEACL{
		ContainerID: cnrID,
		Session:     prm.Session,
	}

	err = waitForEACLPresence(ctx, c, eaclPrm, &prm.Table, prm.WaitParams)
	if err = c.handleError(ctx, nil, err); err != nil {
		return fmt.Errorf("wait eacl presence on client: %w", err)
	}

	return nil
}

// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is.
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
	cl, err := c.getClient()
	if err != nil {
		return netmap.NodeInfo{}, err
	}

	start := time.Now()
	res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
	c.incRequests(time.Since(start), methodEndpointInfo)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
	}

	return res.NodeInfo(), nil
}

// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
	cl, err := c.getClient()
	if err != nil {
		return netmap.NetworkInfo{}, err
	}

	start := time.Now()
	res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
	c.incRequests(time.Since(start), methodNetworkInfo)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
	}

	return res.Info(), nil
}

// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) {
	cl, err := c.getClient()
	if err != nil {
		return netmap.NetMap{}, err
	}

	start := time.Now()
	res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{})
	c.incRequests(time.Since(start), methodNetMapSnapshot)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err)
	}

	return res.NetMap(), nil
}

// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
	if prm.bufferMaxSize == 0 {
		prm.bufferMaxSize = defaultBufferMaxSizeForPut
	}

	if prm.clientCut {
		return c.objectPutClientCut(ctx, prm)
	}

	return c.objectPutServerCut(ctx, prm)
}

func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
	cl, err := c.getClient()
	if err != nil {
		return oid.ID{}, err
	}

	cliPrm := sdkClient.PrmObjectPutInit{
		CopiesNumber: prm.copiesNumber,
		Session:      prm.stoken,
		Key:          prm.key,
		BearerToken:  prm.btoken,
	}

	start := time.Now()
	wObj, err := cl.ObjectPutInit(ctx, cliPrm)
	c.incRequests(time.Since(start), methodObjectPut)
	if err = c.handleError(ctx, nil, err); err != nil {
		return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
	}

	if wObj.WriteHeader(ctx, prm.hdr) {
		sz := prm.hdr.PayloadSize()

		if data := prm.hdr.Payload(); len(data) > 0 {
			if prm.payload != nil {
				prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
			} else {
				prm.payload = bytes.NewReader(data)
				sz = uint64(len(data))
			}
		}

		if prm.payload != nil {
			if sz == 0 || sz > prm.bufferMaxSize {
				sz = prm.bufferMaxSize
			}

			buf := make([]byte, sz)

			var n int

			for {
				n, err = prm.payload.Read(buf)
				if n > 0 {
					start = time.Now()
					successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
					c.incRequests(time.Since(start), methodObjectPut)
					if !successWrite {
						break
					}

					continue
				}

				if errors.Is(err, io.EOF) {
					break
				}

				return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
			}
		}
	}

	res, err := wObj.Close(ctx)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
		return oid.ID{}, fmt.Errorf("client failure: %w", err)
	}

	return res.StoredObjectID(), nil
}

func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
	putInitPrm := PrmObjectPutClientCutInit{
		PrmObjectPut: prm,
	}

	start := time.Now()
	wObj, err := c.objectPutInitTransformer(putInitPrm)
	c.incRequests(time.Since(start), methodObjectPut)
	if err = c.handleError(ctx, nil, err); err != nil {
		return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
	}

	if wObj.WriteHeader(ctx, prm.hdr) {
		sz := prm.hdr.PayloadSize()

		if data := prm.hdr.Payload(); len(data) > 0 {
			if prm.payload != nil {
				prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
			} else {
				prm.payload = bytes.NewReader(data)
				sz = uint64(len(data))
			}
		}

		if prm.payload != nil {
			if sz == 0 || sz > prm.bufferMaxSize {
				sz = prm.bufferMaxSize
			}

			buf := make([]byte, sz)

			var n int

			for {
				n, err = prm.payload.Read(buf)
				if n > 0 {
					start = time.Now()
					successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
					c.incRequests(time.Since(start), methodObjectPut)
					if !successWrite {
						break
					}

					continue
				}

				if errors.Is(err, io.EOF) {
					break
				}

				return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
			}
		}
	}

	res, err := wObj.Close(ctx)
	var st apistatus.Status
	if res != nil {
		st = res.Status
	}
	if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
		return oid.ID{}, fmt.Errorf("client failure: %w", err)
	}

	return res.OID, nil
}

// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
	cl, err := c.getClient()
	if err != nil {
		return err
	}

	cnr := prm.addr.Container()
	obj := prm.addr.Object()

	cliPrm := sdkClient.PrmObjectDelete{
		BearerToken: prm.btoken,
		Session:     prm.stoken,
		ContainerID: &cnr,
		ObjectID:    &obj,
		Key:         prm.key,
	}

	start := time.Now()
	res, err := cl.ObjectDelete(ctx, cliPrm)
	c.incRequests(time.Since(start), methodObjectDelete)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return fmt.Errorf("delete object on client: %w", err)
	}
	return nil
}

// objectGet returns reader for object.
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
	cl, err := c.getClient()
	if err != nil {
		return ResGetObject{}, err
	}

	prmCnr := prm.addr.Container()
	prmObj := prm.addr.Object()

	cliPrm := sdkClient.PrmObjectGet{
		BearerToken: prm.btoken,
		Session:     prm.stoken,
		ContainerID: &prmCnr,
		ObjectID:    &prmObj,
		Key:         prm.key,
	}

	var res ResGetObject

	rObj, err := cl.ObjectGetInit(ctx, cliPrm)
	if err = c.handleError(ctx, nil, err); err != nil {
		return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
	}

	start := time.Now()
	successReadHeader := rObj.ReadHeader(&res.Header)
	c.incRequests(time.Since(start), methodObjectGet)
	if !successReadHeader {
		rObjRes, err := rObj.Close()
		var st apistatus.Status
		if rObjRes != nil {
			st = rObjRes.Status()
		}
		err = c.handleError(ctx, st, err)
		return res, fmt.Errorf("read header: %w", err)
	}

	res.Payload = &objectReadCloser{
		reader: rObj,
		elapsedTimeCallback: func(elapsed time.Duration) {
			c.incRequests(elapsed, methodObjectGet)
		},
	}

	return res, nil
}

// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is.
func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
	cl, err := c.getClient()
	if err != nil {
		return object.Object{}, err
	}

	prmCnr := prm.addr.Container()
	prmObj := prm.addr.Object()

	cliPrm := sdkClient.PrmObjectHead{
		BearerToken: prm.btoken,
		Session:     prm.stoken,
		Raw:         prm.raw,
		ContainerID: &prmCnr,
		ObjectID:    &prmObj,
		Key:         prm.key,
	}

	var obj object.Object

	start := time.Now()
	res, err := cl.ObjectHead(ctx, cliPrm)
	c.incRequests(time.Since(start), methodObjectHead)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return obj, fmt.Errorf("read object header via client: %w", err)
	}
	if !res.ReadHeader(&obj) {
		return obj, errors.New("missing object header in response")
	}

	return obj, nil
}

// objectRange returns object range reader.
func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
	cl, err := c.getClient()
	if err != nil {
		return ResObjectRange{}, err
	}

	prmCnr := prm.addr.Container()
	prmObj := prm.addr.Object()

	cliPrm := sdkClient.PrmObjectRange{
		BearerToken: prm.btoken,
		Session:     prm.stoken,
		ContainerID: &prmCnr,
		ObjectID:    &prmObj,
		Offset:      prm.off,
		Length:      prm.ln,
		Key:         prm.key,
	}

	start := time.Now()
	res, err := cl.ObjectRangeInit(ctx, cliPrm)
	c.incRequests(time.Since(start), methodObjectRange)
	if err = c.handleError(ctx, nil, err); err != nil {
		return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
	}

	return ResObjectRange{
		payload: res,
		elapsedTimeCallback: func(elapsed time.Duration) {
			c.incRequests(elapsed, methodObjectRange)
		},
	}, nil
}

// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is.
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
	cl, err := c.getClient()
	if err != nil {
		return ResObjectSearch{}, err
	}

	cliPrm := sdkClient.PrmObjectSearch{
		ContainerID: &prm.cnrID,
		Filters:     prm.filters,
		Session:     prm.stoken,
		BearerToken: prm.btoken,
		Key:         prm.key,
	}

	res, err := cl.ObjectSearchInit(ctx, cliPrm)
	if err = c.handleError(ctx, nil, err); err != nil {
		return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
	}

	return ResObjectSearch{r: res}, nil
}

// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) {
	cl, err := c.getClient()
	if err != nil {
		return resCreateSession{}, err
	}

	cliPrm := sdkClient.PrmSessionCreate{
		Expiration: prm.exp,
		Key:        &prm.key,
	}

	start := time.Now()
	res, err := cl.SessionCreate(ctx, cliPrm)
	c.incRequests(time.Since(start), methodSessionCreate)
	var st apistatus.Status
	if res != nil {
		st = res.Status()
	}
	if err = c.handleError(ctx, st, err); err != nil {
		return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
	}

	return resCreateSession{
		id:         res.ID(),
		sessionKey: res.PublicKey(),
	}, nil
}

func (c *clientStatusMonitor) isHealthy() bool {
	return c.healthy.Load() == statusHealthy
}

func (c *clientStatusMonitor) isDialed() bool {
	return c.healthy.Load() != statusUnhealthyOnDial
}

func (c *clientStatusMonitor) setHealthy() {
	c.healthy.Store(statusHealthy)
}

func (c *clientStatusMonitor) setUnhealthy() {
	c.healthy.Store(statusUnhealthyOnRequest)
}

func (c *clientStatusMonitor) setUnhealthyOnDial() {
	c.healthy.Store(statusUnhealthyOnDial)
}

func (c *clientStatusMonitor) address() string {
	return c.addr
}

func (c *clientStatusMonitor) incErrorRate() {
	c.mu.Lock()
	c.currentErrorCount++
	c.overallErrorCount++

	thresholdReached := c.currentErrorCount >= c.errorThreshold
	if thresholdReached {
		c.setUnhealthy()
		c.currentErrorCount = 0
	}
	c.mu.Unlock()

	if thresholdReached {
		c.log(zapcore.WarnLevel, "error threshold reached",
			zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
	}
}

func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
	if c.logger == nil {
		return
	}

	c.logger.Log(level, msg, fields...)
}

func (c *clientStatusMonitor) currentErrorRate() uint32 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.currentErrorCount
}

func (c *clientStatusMonitor) overallErrorRate() uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.overallErrorCount
}

func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
	result := make([]statusSnapshot, len(c.methods))
	for i, val := range c.methods {
		result[i] = val.snapshot()
	}

	return result
}

func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
	methodStat := c.methods[method]
	methodStat.incRequests(elapsed)
	if c.prm.poolRequestInfoCallback != nil {
		c.prm.poolRequestInfoCallback(RequestInfo{
			Address: c.prm.address,
			Method:  method,
			Elapsed: elapsed,
		})
	}
}

func (c *clientWrapper) close() error {
	if c.client != nil {
		return c.client.Close()
	}
	return nil
}

func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
	if err != nil {
		if needCountError(ctx, err) {
			c.incErrorRate()
		}

		return err
	}

	err = apistatus.ErrFromStatus(st)
	switch err.(type) {
	case *apistatus.ServerInternal,
		*apistatus.WrongMagicNumber,
		*apistatus.SignatureVerification,
		*apistatus.NodeUnderMaintenance:
		c.incErrorRate()
	}

	return err
}

func needCountError(ctx context.Context, err error) bool {
	// non-status logic error that could be returned
	// from the SDK client; should not be considered
	// as a connection error
	var siErr *object.SplitInfoError
	if errors.As(err, &siErr) {
		return false
	}
	var eiErr *object.ECInfoError
	if errors.As(err, &eiErr) {
		return false
	}

	if errors.Is(ctx.Err(), context.Canceled) {
		return false
	}

	return true
}

// clientBuilder is a type alias of client constructors which open connection
// to the given endpoint.
type clientBuilder = func(endpoint string) client

// RequestInfo groups info about pool request.
type RequestInfo struct {
	Address string
	Method  MethodIndex
	Elapsed time.Duration
}

// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
	key                       *ecdsa.PrivateKey
	logger                    *zap.Logger
	nodeDialTimeout           time.Duration
	nodeStreamTimeout         time.Duration
	healthcheckTimeout        time.Duration
	clientRebalanceInterval   time.Duration
	sessionExpirationDuration uint64
	errorThreshold            uint32
	nodeParams                []NodeParam
	requestCallback           func(RequestInfo)
	dialOptions               []grpc.DialOption

	clientBuilder clientBuilder
}

// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) {
	x.key = key
}

// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
	x.logger = logger
}

// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
	x.nodeDialTimeout = timeout
}

// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
	x.nodeStreamTimeout = timeout
}

// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
	x.healthcheckTimeout = timeout
}

// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
	x.clientRebalanceInterval = interval
}

// SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
	x.sessionExpirationDuration = expirationDuration
}

// SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.
func (x *InitParameters) SetErrorThreshold(threshold uint32) {
	x.errorThreshold = threshold
}

// SetRequestCallback makes the pool client to pass RequestInfo for each
// request to f. Nil (default) means ignore RequestInfo.
func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) {
	x.requestCallback = f
}

// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam NodeParam) {
	x.nodeParams = append(x.nodeParams, nodeParam)
}

// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
	x.dialOptions = opts
}

// setClientBuilder sets clientBuilder used for client construction.
// Wraps setClientBuilderContext without a context.
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
	x.clientBuilder = builder
}

// isMissingClientBuilder checks if client constructor was not specified.
func (x *InitParameters) isMissingClientBuilder() bool {
	return x.clientBuilder == nil
}

type rebalanceParameters struct {
	nodesParams               []*nodesParam
	nodeRequestTimeout        time.Duration
	clientRebalanceInterval   time.Duration
	sessionExpirationDuration uint64
}

type nodesParam struct {
	priority  int
	addresses []string
	weights   []float64
}

// NodeParam groups parameters of remote node.
type NodeParam struct {
	priority int
	address  string
	weight   float64
}

// NewNodeParam creates NodeParam using parameters.
func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) {
	prm.SetPriority(priority)
	prm.SetAddress(address)
	prm.SetWeight(weight)

	return
}

// SetPriority specifies priority of the node.
// Negative value is allowed. In the result node groups
// with the same priority will be sorted by descent.
func (x *NodeParam) SetPriority(priority int) {
	x.priority = priority
}

// Priority returns priority of the node.
func (x *NodeParam) Priority() int {
	return x.priority
}

// SetAddress specifies address of the node.
func (x *NodeParam) SetAddress(address string) {
	x.address = address
}

// Address returns address of the node.
func (x *NodeParam) Address() string {
	return x.address
}

// SetWeight specifies weight of the node.
func (x *NodeParam) SetWeight(weight float64) {
	x.weight = weight
}

// Weight returns weight of the node.
func (x *NodeParam) Weight() float64 {
	return x.weight
}

// WaitParams contains parameters used in polling is a something applied on FrostFS network.
type WaitParams struct {
	Timeout      time.Duration
	PollInterval time.Duration
}

// SetTimeout specifies the time to wait for the operation to complete.
//
// Deprecated: Use WaitParams.Timeout instead.
func (x *WaitParams) SetTimeout(timeout time.Duration) {
	x.Timeout = timeout
}

// SetPollInterval specifies the interval, once it will check the completion of the operation.
//
// Deprecated: Use WaitParams.PollInterval instead.
func (x *WaitParams) SetPollInterval(tick time.Duration) {
	x.PollInterval = tick
}

func defaultWaitParams() *WaitParams {
	return &WaitParams{
		Timeout:      120 * time.Second,
		PollInterval: 5 * time.Second,
	}
}

// checkForPositive panics if any of the wait params isn't positive.
func (x *WaitParams) checkForPositive() {
	if x.Timeout <= 0 || x.PollInterval <= 0 {
		panic("all wait params must be positive")
	}
}

// CheckForValid checks if all wait params are non-negative.
func (x *WaitParams) CheckValidity() error {
	if x.Timeout <= 0 {
		return errors.New("timeout cannot be negative")
	}
	if x.PollInterval <= 0 {
		return errors.New("poll interval cannot be negative")
	}
	return nil
}

type prmContext struct {
	defaultSession bool
	verb           session.ObjectVerb
	cnr            cid.ID

	objSet bool
	objs   []oid.ID
}

func (x *prmContext) useDefaultSession() {
	x.defaultSession = true
}

func (x *prmContext) useContainer(cnr cid.ID) {
	x.cnr = cnr
}

func (x *prmContext) useObjects(ids []oid.ID) {
	x.objs = ids
	x.objSet = true
}

func (x *prmContext) useAddress(addr oid.Address) {
	x.cnr = addr.Container()
	x.objs = []oid.ID{addr.Object()}
	x.objSet = true
}

func (x *prmContext) useVerb(verb session.ObjectVerb) {
	x.verb = verb
}

type prmCommon struct {
	key    *ecdsa.PrivateKey
	btoken *bearer.Token
	stoken *session.Object
}

// UseKey specifies private key to sign the requests.
// If key is not provided, then Pool default key is used.
func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) {
	x.key = key
}

// UseBearer attaches bearer token to be used for the operation.
func (x *prmCommon) UseBearer(token bearer.Token) {
	x.btoken = &token
}

// UseSession specifies session within which operation should be performed.
func (x *prmCommon) UseSession(token session.Object) {
	x.stoken = &token
}

// PrmObjectPut groups parameters of PutObject operation.
type PrmObjectPut struct {
	prmCommon

	hdr object.Object

	payload io.Reader

	copiesNumber []uint32

	clientCut   bool
	networkInfo netmap.NetworkInfo

	withoutHomomorphicHash bool

	bufferMaxSize uint64
}

// SetHeader specifies header of the object.
func (x *PrmObjectPut) SetHeader(hdr object.Object) {
	x.hdr = hdr
}

// SetPayload specifies payload of the object.
func (x *PrmObjectPut) SetPayload(payload io.Reader) {
	x.payload = payload
}

// SetCopiesNumber sets number of object copies that is enough to consider put successful.
// Zero means using default behavior.
func (x *PrmObjectPut) SetCopiesNumber(copiesNumber uint32) {
	x.copiesNumber = []uint32{copiesNumber}
}

// SetCopiesNumberVector sets number of object copies that is enough to consider put successful, provided as array.
// Nil/empty vector means using default behavior.
func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
	x.copiesNumber = copiesNumber
}

// SetClientCut enables client cut for objects. It means that full object is prepared on client side
// and retrying is possible. But this leads to additional memory using for buffering object parts.
// Buffer size for every put is MaxObjectSize value from FrostFS network.
// There is limit for total memory allocation for in-flight request and
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
// Put requests will fail if this limit be reached.
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
	x.clientCut = clientCut
}

// WithoutHomomorphicHash if set to true do not use Tillich-ZĂ©mor hash for payload.
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
	x.withoutHomomorphicHash = v
}

// SetBufferMaxSize sets max buffer size to read payload.
// This value isn't used if object size is set explicitly and less than this value.
// Default value 3MB.
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
	x.bufferMaxSize = size
}

func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
	x.networkInfo = ni
}

// PrmObjectDelete groups parameters of DeleteObject operation.
type PrmObjectDelete struct {
	prmCommon

	addr oid.Address
}

// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectDelete) SetAddress(addr oid.Address) {
	x.addr = addr
}

// PrmObjectGet groups parameters of GetObject operation.
type PrmObjectGet struct {
	prmCommon

	addr oid.Address
}

// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectGet) SetAddress(addr oid.Address) {
	x.addr = addr
}

// PrmObjectHead groups parameters of HeadObject operation.
type PrmObjectHead struct {
	prmCommon

	addr oid.Address
	raw  bool
}

// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectHead) SetAddress(addr oid.Address) {
	x.addr = addr
}

// MarkRaw marks an intent to read physically stored object.
func (x *PrmObjectHead) MarkRaw() {
	x.raw = true
}

// PrmObjectRange groups parameters of RangeObject operation.
type PrmObjectRange struct {
	prmCommon

	addr    oid.Address
	off, ln uint64
}

// SetAddress specifies FrostFS address of the object.
func (x *PrmObjectRange) SetAddress(addr oid.Address) {
	x.addr = addr
}

// SetOffset sets offset of the payload range to be read.
func (x *PrmObjectRange) SetOffset(offset uint64) {
	x.off = offset
}

// SetLength sets length of the payload range to be read.
func (x *PrmObjectRange) SetLength(length uint64) {
	x.ln = length
}

// PrmObjectSearch groups parameters of SearchObjects operation.
type PrmObjectSearch struct {
	prmCommon

	cnrID   cid.ID
	filters object.SearchFilters
}

// SetContainerID specifies the container in which to look for objects.
func (x *PrmObjectSearch) SetContainerID(cnrID cid.ID) {
	x.cnrID = cnrID
}

// SetFilters specifies filters by which to select objects.
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
	x.filters = filters
}

// PrmContainerPut groups parameters of PutContainer operation.
type PrmContainerPut struct {
	ClientParams sdkClient.PrmContainerPut

	WaitParams *WaitParams
}

// SetContainer container structure to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.SetContainer.
//
// Deprecated: Use PrmContainerPut.ClientParams.Container instead.
func (x *PrmContainerPut) SetContainer(cnr container.Container) {
	x.ClientParams.SetContainer(cnr)
}

// WithinSession specifies session to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.WithinSession.
//
// Deprecated: Use PrmContainerPut.ClientParams.Session instead.
func (x *PrmContainerPut) WithinSession(s session.Container) {
	x.ClientParams.WithinSession(s)
}

// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerPut.ClientParams.WaitParams instead.
func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) {
	waitParams.checkForPositive()
	x.WaitParams = &waitParams
}

// PrmContainerGet groups parameters of GetContainer operation.
type PrmContainerGet struct {
	ContainerID cid.ID

	Session *session.Container
}

// SetContainerID specifies identifier of the container to be read.
//
// Deprecated: Use PrmContainerGet.ContainerID instead.
func (prm *PrmContainerGet) SetContainerID(cnrID cid.ID) {
	prm.ContainerID = cnrID
}

// PrmContainerList groups parameters of ListContainers operation.
type PrmContainerList struct {
	OwnerID user.ID

	Session *session.Container
}

// SetOwnerID specifies identifier of the FrostFS account to list the containers.
//
// Deprecated: Use PrmContainerList.OwnerID instead.
func (x *PrmContainerList) SetOwnerID(ownerID user.ID) {
	x.OwnerID = ownerID
}

// PrmContainerDelete groups parameters of DeleteContainer operation.
type PrmContainerDelete struct {
	ContainerID cid.ID

	Session *session.Container

	WaitParams *WaitParams
}

// SetContainerID specifies identifier of the FrostFS container to be removed.
//
// Deprecated: Use PrmContainerDelete.ContainerID instead.
func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) {
	x.ContainerID = cnrID
}

// SetSessionToken specifies session within which operation should be performed.
//
// Deprecated: Use PrmContainerDelete.Session instead.
func (x *PrmContainerDelete) SetSessionToken(token session.Container) {
	x.Session = &token
}

// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerDelete.WaitParams instead.
func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) {
	waitParams.checkForPositive()
	x.WaitParams = &waitParams
}

// PrmContainerEACL groups parameters of GetEACL operation.
type PrmContainerEACL struct {
	ContainerID cid.ID

	Session *session.Container
}

// SetContainerID specifies identifier of the FrostFS container to read the eACL table.
//
// Deprecated: Use PrmContainerEACL.ContainerID instead.
func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
	x.ContainerID = cnrID
}

// PrmContainerSetEACL groups parameters of SetEACL operation.
type PrmContainerSetEACL struct {
	Table eacl.Table

	Session *session.Container

	WaitParams *WaitParams
}

// SetTable sets structure of container's extended ACL to be used as a
// parameter of the base client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
//
// Deprecated: Use PrmContainerSetEACL.Table instead.
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
	x.Table = table
}

// WithinSession specifies session to be used as a parameter of the base
// client's operation.
//
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
//
// Deprecated: Use PrmContainerSetEACL.Session instead.
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
	x.Session = &s
}

// SetWaitParams specifies timeout params to complete operation.
// If not provided the default one will be used.
// Panics if any of the wait params isn't positive.
//
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
	waitParams.checkForPositive()
	x.WaitParams = &waitParams
}

// PrmBalanceGet groups parameters of Balance operation.
type PrmBalanceGet struct {
	account user.ID
}

// SetAccount specifies identifier of the FrostFS account for which the balance is requested.
func (x *PrmBalanceGet) SetAccount(id user.ID) {
	x.account = id
}

// prmEndpointInfo groups parameters of sessionCreate operation.
type prmCreateSession struct {
	exp uint64
	key ecdsa.PrivateKey
}

// setExp sets number of the last FrostFS epoch in the lifetime of the session after which it will be expired.
func (x *prmCreateSession) setExp(exp uint64) {
	x.exp = exp
}

// useKey specifies owner private key for session token.
// If key is not provided, then Pool default key is used.
func (x *prmCreateSession) useKey(key ecdsa.PrivateKey) {
	x.key = key
}

// prmEndpointInfo groups parameters of endpointInfo operation.
type prmEndpointInfo struct{}

// prmNetworkInfo groups parameters of networkInfo operation.
type prmNetworkInfo struct{}

// prmNetMapSnapshot groups parameters of netMapSnapshot operation.
type prmNetMapSnapshot struct{}

// resCreateSession groups resulting values of sessionCreate operation.
type resCreateSession struct {
	id []byte

	sessionKey []byte
}

// Pool represents virtual connection to the FrostFS network to communicate
// with multiple FrostFS servers without thinking about switching between servers
// due to load balancing proportions or their unavailability.
// It is designed to provide a convenient abstraction from the multiple sdkClient.client types.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS operations using the Pool, connection to the
// servers MUST BE correctly established (see Dial method).
// Using the Pool before connecting have been established can lead to a panic.
// After the work, the Pool SHOULD BE closed (see Close method): it frees internal
// and system resources which were allocated for the period of work of the Pool.
// Calling Dial/Close methods during the communication process step strongly discouraged
// as it leads to undefined behavior.
//
// Each method which produces a FrostFS API call may return an error.
// Status of underlying server response is casted to built-in error instance.
// Certain statuses can be checked using `sdkClient` and standard `errors` packages.
// Note that package provides some helper functions to work with status returns
// (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound).
//
// See pool package overview to get some examples.
type Pool struct {
	innerPools      []*innerPool
	key             *ecdsa.PrivateKey
	cancel          context.CancelFunc
	closedCh        chan struct{}
	cache           *sessionCache
	stokenDuration  uint64
	rebalanceParams rebalanceParameters
	clientBuilder   clientBuilder
	logger          *zap.Logger

	maxObjectSize uint64
}

type innerPool struct {
	lock    sync.RWMutex
	sampler *sampler
	clients []client
}

const (
	defaultSessionTokenExpirationDuration = 100 // in blocks
	defaultErrorThreshold                 = 100

	defaultRebalanceInterval  = 15 * time.Second
	defaultHealthcheckTimeout = 4 * time.Second
	defaultDialTimeout        = 5 * time.Second
	defaultStreamTimeout      = 10 * time.Second

	defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)

// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
	if options.key == nil {
		return nil, fmt.Errorf("missed required parameter 'Key'")
	}

	nodesParams, err := adjustNodeParams(options.nodeParams)
	if err != nil {
		return nil, err
	}

	cache, err := newCache()
	if err != nil {
		return nil, fmt.Errorf("couldn't create cache: %w", err)
	}

	fillDefaultInitParams(&options, cache)

	pool := &Pool{
		key:            options.key,
		cache:          cache,
		logger:         options.logger,
		stokenDuration: options.sessionExpirationDuration,
		rebalanceParams: rebalanceParameters{
			nodesParams:               nodesParams,
			nodeRequestTimeout:        options.healthcheckTimeout,
			clientRebalanceInterval:   options.clientRebalanceInterval,
			sessionExpirationDuration: options.sessionExpirationDuration,
		},
		clientBuilder: options.clientBuilder,
	}

	return pool, nil
}

// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
	inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
	var atLeastOneHealthy bool

	for i, params := range p.rebalanceParams.nodesParams {
		clients := make([]client, len(params.weights))
		for j, addr := range params.addresses {
			clients[j] = p.clientBuilder(addr)
			if err := clients[j].dial(ctx); err != nil {
				p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
				continue
			}

			var st session.Object
			err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
			if err != nil {
				clients[j].setUnhealthy()
				p.log(zap.WarnLevel, "failed to create frostfs session token for client",
					zap.String("address", addr), zap.Error(err))
				continue
			}

			_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
			atLeastOneHealthy = true
		}
		source := rand.NewSource(time.Now().UnixNano())
		sampl := newSampler(params.weights, source)

		inner[i] = &innerPool{
			sampler: sampl,
			clients: clients,
		}
	}

	if !atLeastOneHealthy {
		return fmt.Errorf("at least one node must be healthy")
	}

	ctx, cancel := context.WithCancel(ctx)
	p.cancel = cancel
	p.closedCh = make(chan struct{})
	p.innerPools = inner

	ni, err := p.NetworkInfo(ctx)
	if err != nil {
		return fmt.Errorf("get network info for max object size: %w", err)
	}
	p.maxObjectSize = ni.MaxObjectSize()

	go p.startRebalance(ctx)
	return nil
}

func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
	if p.logger == nil {
		return
	}

	p.logger.Log(level, msg, fields...)
}

func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
	if params.sessionExpirationDuration == 0 {
		params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
	}

	if params.errorThreshold == 0 {
		params.errorThreshold = defaultErrorThreshold
	}

	if params.clientRebalanceInterval <= 0 {
		params.clientRebalanceInterval = defaultRebalanceInterval
	}

	if params.healthcheckTimeout <= 0 {
		params.healthcheckTimeout = defaultHealthcheckTimeout
	}

	if params.nodeDialTimeout <= 0 {
		params.nodeDialTimeout = defaultDialTimeout
	}

	if params.nodeStreamTimeout <= 0 {
		params.nodeStreamTimeout = defaultStreamTimeout
	}

	if params.isMissingClientBuilder() {
		params.setClientBuilder(func(addr string) client {
			var prm wrapperPrm
			prm.setAddress(addr)
			prm.setKey(*params.key)
			prm.setLogger(params.logger)
			prm.setDialTimeout(params.nodeDialTimeout)
			prm.setStreamTimeout(params.nodeStreamTimeout)
			prm.setErrorThreshold(params.errorThreshold)
			prm.setPoolRequestCallback(params.requestCallback)
			prm.setGRPCDialOptions(params.dialOptions)
			prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
				cache.updateEpoch(info.Epoch())
				return nil
			})
			return newWrapper(prm)
		})
	}
}

func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
	if len(nodeParams) == 0 {
		return nil, errors.New("no FrostFS peers configured")
	}

	nodesParamsMap := make(map[int]*nodesParam)
	for _, param := range nodeParams {
		nodes, ok := nodesParamsMap[param.priority]
		if !ok {
			nodes = &nodesParam{priority: param.priority}
		}
		nodes.addresses = append(nodes.addresses, param.address)
		nodes.weights = append(nodes.weights, param.weight)
		nodesParamsMap[param.priority] = nodes
	}

	nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
	for _, nodes := range nodesParamsMap {
		nodes.weights = adjustWeights(nodes.weights)
		nodesParams = append(nodesParams, nodes)
	}

	sort.Slice(nodesParams, func(i, j int) bool {
		return nodesParams[i].priority < nodesParams[j].priority
	})

	return nodesParams, nil
}

// startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
	ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
	buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
	for i, params := range p.rebalanceParams.nodesParams {
		buffers[i] = make([]float64, len(params.weights))
	}

	for {
		select {
		case <-ctx.Done():
			close(p.closedCh)
			return
		case <-ticker.C:
			p.updateNodesHealth(ctx, buffers)
			ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
		}
	}
}

func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
	wg := sync.WaitGroup{}
	for i, inner := range p.innerPools {
		wg.Add(1)

		bufferWeights := buffers[i]
		go func(i int, _ *innerPool) {
			defer wg.Done()
			p.updateInnerNodesHealth(ctx, i, bufferWeights)
		}(i, inner)
	}
	wg.Wait()
}

func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
	if i > len(p.innerPools)-1 {
		return
	}
	pool := p.innerPools[i]
	options := p.rebalanceParams

	healthyChanged := new(atomic.Bool)
	wg := sync.WaitGroup{}

	for j, cli := range pool.clients {
		wg.Add(1)
		go func(j int, cli client) {
			defer wg.Done()

			tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
			defer c()

			healthy, changed := cli.restartIfUnhealthy(tctx)
			if healthy {
				bufferWeights[j] = options.nodesParams[i].weights[j]
			} else {
				bufferWeights[j] = 0
				p.cache.DeleteByPrefix(cli.address())
			}

			if changed {
				p.log(zap.DebugLevel, "health has changed",
					zap.String("address", cli.address()), zap.Bool("healthy", healthy))
				healthyChanged.Store(true)
			}
		}(j, cli)
	}
	wg.Wait()

	if healthyChanged.Load() {
		probabilities := adjustWeights(bufferWeights)
		source := rand.NewSource(time.Now().UnixNano())
		pool.lock.Lock()
		pool.sampler = newSampler(probabilities, source)
		pool.lock.Unlock()
	}
}

func adjustWeights(weights []float64) []float64 {
	adjusted := make([]float64, len(weights))
	sum := 0.0
	for _, weight := range weights {
		sum += weight
	}
	if sum > 0 {
		for i, weight := range weights {
			adjusted[i] = weight / sum
		}
	}

	return adjusted
}

func (p *Pool) connection() (client, error) {
	for _, inner := range p.innerPools {
		cp, err := inner.connection()
		if err == nil {
			return cp, nil
		}
	}

	return nil, errors.New("no healthy client")
}

func (p *innerPool) connection() (client, error) {
	p.lock.RLock() // need lock because of using p.sampler
	defer p.lock.RUnlock()
	if len(p.clients) == 1 {
		cp := p.clients[0]
		if cp.isHealthy() {
			return cp, nil
		}
		return nil, errors.New("no healthy client")
	}
	attempts := 3 * len(p.clients)
	for k := 0; k < attempts; k++ {
		i := p.sampler.Next()
		if cp := p.clients[i]; cp.isHealthy() {
			return cp, nil
		}
	}

	return nil, errors.New("no healthy client")
}

func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
	k := keys.PrivateKey{PrivateKey: *key}

	stype := "server"
	if clientCut {
		stype = "client"
	}

	return address + stype + k.String()
}

func (p *Pool) checkSessionTokenErr(err error, address string) bool {
	if err == nil {
		return false
	}

	if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) {
		p.cache.DeleteByPrefix(address)
		return true
	}

	return false
}

func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
	ni, err := c.networkInfo(ctx, prmNetworkInfo{})
	if err != nil {
		return err
	}

	epoch := ni.CurrentEpoch()

	var exp uint64
	if math.MaxUint64-epoch < dur {
		exp = math.MaxUint64
	} else {
		exp = epoch + dur
	}
	var prm prmCreateSession
	prm.setExp(exp)
	prm.useKey(ownerKey)

	var (
		id  uuid.UUID
		key frostfsecdsa.PublicKey
	)

	if clientCut {
		id = uuid.New()
		key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
	} else {
		res, err := c.sessionCreate(ctx, prm)
		if err != nil {
			return err
		}
		if err = id.UnmarshalBinary(res.id); err != nil {
			return fmt.Errorf("invalid session token ID: %w", err)
		}
		if err = key.Decode(res.sessionKey); err != nil {
			return fmt.Errorf("invalid public session key: %w", err)
		}
	}

	dst.SetID(id)
	dst.SetAuthKey(&key)
	dst.SetExp(exp)

	return nil
}

type callContext struct {
	client client

	// client endpoint
	endpoint string

	// request signer
	key *ecdsa.PrivateKey

	// flag to open default session if session token is missing
	sessionDefault bool
	sessionTarget  func(session.Object)
	sessionVerb    session.ObjectVerb
	sessionCnr     cid.ID
	sessionObjSet  bool
	sessionObjs    []oid.ID

	sessionClientCut bool
}

func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
	cp, err := p.connection()
	if err != nil {
		return err
	}

	ctx.key = cfg.key
	if ctx.key == nil {
		// use pool key if caller didn't specify its own
		ctx.key = p.key
	}

	ctx.endpoint = cp.address()
	ctx.client = cp

	if ctx.sessionTarget != nil && cfg.stoken != nil {
		ctx.sessionTarget(*cfg.stoken)
	}

	// note that we don't override session provided by the caller
	ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
	if ctx.sessionDefault {
		ctx.sessionVerb = prmCtx.verb
		ctx.sessionCnr = prmCtx.cnr
		ctx.sessionObjSet = prmCtx.objSet
		ctx.sessionObjs = prmCtx.objs
	}

	return err
}

// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
	cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)

	tok, ok := p.cache.Get(cacheKey)
	if !ok {
		// init new session
		err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
		if err != nil {
			return fmt.Errorf("session API client: %w", err)
		}

		// cache the opened session
		p.cache.Put(cacheKey, tok)
	}

	tok.ForVerb(cc.sessionVerb)
	tok.BindContainer(cc.sessionCnr)

	if cc.sessionObjSet {
		tok.LimitByObjects(cc.sessionObjs...)
	}

	// sign the token
	if err := tok.Sign(*cc.key); err != nil {
		return fmt.Errorf("sign token of the opened session: %w", err)
	}

	cc.sessionTarget(tok)

	return nil
}

// opens default session (if sessionDefault is set), and calls f. If f returns
// session-related error then cached token is removed.
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
	var err error

	if cc.sessionDefault {
		err = p.openDefaultSession(ctx, cc)
		if err != nil {
			return fmt.Errorf("open default session: %w", err)
		}
	}

	err = f()
	_ = p.checkSessionTokenErr(err, cc.endpoint)

	return err
}

// fillAppropriateKey use pool key if caller didn't specify its own.
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
	if prm.key == nil {
		prm.key = p.key
	}
}

// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
	cnr, _ := prm.hdr.ContainerID()

	var prmCtx prmContext
	prmCtx.useDefaultSession()
	prmCtx.useVerb(session.VerbObjectPut)
	prmCtx.useContainer(cnr)

	p.fillAppropriateKey(&prm.prmCommon)

	var ctxCall callContext
	ctxCall.sessionClientCut = prm.clientCut
	if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
		return oid.ID{}, fmt.Errorf("init call context: %w", err)
	}

	if ctxCall.sessionDefault {
		ctxCall.sessionTarget = prm.UseSession
		if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
			return oid.ID{}, fmt.Errorf("open default session: %w", err)
		}
	}

	if prm.clientCut {
		var ni netmap.NetworkInfo
		ni.SetCurrentEpoch(p.cache.Epoch())
		ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
		prm.setNetworkInfo(ni)
	}

	id, err := ctxCall.client.objectPut(ctx, prm)
	if err != nil {
		// removes session token from cache in case of token error
		p.checkSessionTokenErr(err, ctxCall.endpoint)
		return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
	}

	return id, nil
}

// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
	var prmCtx prmContext
	prmCtx.useDefaultSession()
	prmCtx.useVerb(session.VerbObjectDelete)
	prmCtx.useAddress(prm.addr)

	if prm.stoken == nil { // collect phy objects only if we are about to open default session
		var tokens relations.Tokens
		tokens.Bearer = prm.btoken

		relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens)
		if err != nil {
			return fmt.Errorf("failed to collect relatives: %w", err)
		}

		if len(relatives) != 0 {
			prmCtx.useContainer(prm.addr.Container())
			prmCtx.useObjects(append(relatives, prm.addr.Object()))
		}
	}

	p.fillAppropriateKey(&prm.prmCommon)

	var cc callContext
	cc.sessionTarget = prm.UseSession

	err := p.initCallContext(&cc, prm.prmCommon, prmCtx)
	if err != nil {
		return err
	}

	return p.call(ctx, &cc, func() error {
		if err = cc.client.objectDelete(ctx, prm); err != nil {
			return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err)
		}

		return nil
	})
}

type objectReadCloser struct {
	reader              *sdkClient.ObjectReader
	elapsedTimeCallback func(time.Duration)
}

// Read implements io.Reader of the object payload.
func (x *objectReadCloser) Read(p []byte) (int, error) {
	start := time.Now()
	n, err := x.reader.Read(p)
	x.elapsedTimeCallback(time.Since(start))
	return n, err
}

// Close implements io.Closer of the object payload.
func (x *objectReadCloser) Close() error {
	_, err := x.reader.Close()
	return err
}

// ResGetObject is designed to provide object header nad read one object payload from FrostFS system.
type ResGetObject struct {
	Header object.Object

	Payload io.ReadCloser
}

// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
	p.fillAppropriateKey(&prm.prmCommon)

	var cc callContext
	cc.sessionTarget = prm.UseSession

	var res ResGetObject

	err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
	if err != nil {
		return res, err
	}

	return res, p.call(ctx, &cc, func() error {
		res, err = cc.client.objectGet(ctx, prm)
		if err != nil {
			return fmt.Errorf("get object via client %s: %w", cc.endpoint, err)
		}
		return nil
	})
}

// HeadObject reads object header through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
	p.fillAppropriateKey(&prm.prmCommon)

	var cc callContext
	cc.sessionTarget = prm.UseSession

	var obj object.Object

	err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
	if err != nil {
		return obj, err
	}

	return obj, p.call(ctx, &cc, func() error {
		obj, err = cc.client.objectHead(ctx, prm)
		if err != nil {
			return fmt.Errorf("head object via client %s: %w", cc.endpoint, err)
		}
		return nil
	})
}

// ResObjectRange is designed to read payload range of one object
// from FrostFS system.
//
// Must be initialized using Pool.ObjectRange, any other
// usage is unsafe.
type ResObjectRange struct {
	payload             *sdkClient.ObjectRangeReader
	elapsedTimeCallback func(time.Duration)
}

// Read implements io.Reader of the object payload.
func (x *ResObjectRange) Read(p []byte) (int, error) {
	start := time.Now()
	n, err := x.payload.Read(p)
	x.elapsedTimeCallback(time.Since(start))
	return n, err
}

// Close ends reading the payload range and returns the result of the operation
// along with the final results. Must be called after using the ResObjectRange.
func (x *ResObjectRange) Close() error {
	_, err := x.payload.Close()
	return err
}

// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
	p.fillAppropriateKey(&prm.prmCommon)

	var cc callContext
	cc.sessionTarget = prm.UseSession

	var res ResObjectRange

	err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
	if err != nil {
		return res, err
	}

	return res, p.call(ctx, &cc, func() error {
		res, err = cc.client.objectRange(ctx, prm)
		if err != nil {
			return fmt.Errorf("object range via client %s: %w", cc.endpoint, err)
		}
		return nil
	})
}

// ResObjectSearch is designed to read list of object identifiers from FrostFS system.
//
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
type ResObjectSearch struct {
	r *sdkClient.ObjectListReader
}

// Read reads another list of the object identifiers.
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
	n, ok := x.r.Read(buf)
	if !ok {
		_, err := x.r.Close()
		if err == nil {
			return n, io.EOF
		}

		return n, err
	}

	return n, nil
}

// Iterate iterates over the list of found object identifiers.
// f can return true to stop iteration earlier.
//
// Returns an error if object can't be read.
func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error {
	return x.r.Iterate(f)
}

// Close ends reading list of the matched objects and returns the result of the operation
// along with the final results. Must be called after using the ResObjectSearch.
func (x *ResObjectSearch) Close() {
	_, _ = x.r.Close()
}

// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
	p.fillAppropriateKey(&prm.prmCommon)

	var cc callContext
	cc.sessionTarget = prm.UseSession

	var res ResObjectSearch

	err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
	if err != nil {
		return res, err
	}

	return res, p.call(ctx, &cc, func() error {
		res, err = cc.client.objectSearch(ctx, prm)
		if err != nil {
			return fmt.Errorf("search object via client %s: %w", cc.endpoint, err)
		}
		return nil
	})
}

// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
//	polling interval: 5s
//	waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
	cp, err := p.connection()
	if err != nil {
		return cid.ID{}, err
	}

	cnrID, err := cp.containerPut(ctx, prm)
	if err != nil {
		return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err)
	}

	return cnrID, nil
}

// GetContainer reads FrostFS container by ID.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
	cp, err := p.connection()
	if err != nil {
		return container.Container{}, err
	}

	cnrs, err := cp.containerGet(ctx, prm)
	if err != nil {
		return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err)
	}

	return cnrs, nil
}

// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
	cp, err := p.connection()
	if err != nil {
		return nil, err
	}

	cnrIDs, err := cp.containerList(ctx, prm)
	if err != nil {
		return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err)
	}

	return cnrIDs, nil
}

// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
//	polling interval: 5s
//	waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
	cp, err := p.connection()
	if err != nil {
		return err
	}

	err = cp.containerDelete(ctx, prm)
	if err != nil {
		return fmt.Errorf("delete container via client '%s': %w", cp.address(), err)
	}

	return nil
}

// GetEACL reads eACL table of the FrostFS container.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
	cp, err := p.connection()
	if err != nil {
		return eacl.Table{}, err
	}

	eaclResult, err := cp.containerEACL(ctx, prm)
	if err != nil {
		return eacl.Table{}, fmt.Errorf("get EACL via client '%s': %w", cp.address(), err)
	}

	return eaclResult, nil
}

// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
//	polling interval: 5s
//	waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
	cp, err := p.connection()
	if err != nil {
		return err
	}

	err = cp.containerSetEACL(ctx, prm)
	if err != nil {
		return fmt.Errorf("set EACL via client '%s': %w", cp.address(), err)
	}

	return nil
}

// Balance requests current balance of the FrostFS account.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
	cp, err := p.connection()
	if err != nil {
		return accounting.Decimal{}, err
	}

	balance, err := cp.balanceGet(ctx, prm)
	if err != nil {
		return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err)
	}

	return balance, nil
}

// Statistic returns connection statistics.
func (p Pool) Statistic() Statistic {
	stat := Statistic{}
	for _, inner := range p.innerPools {
		nodes := make([]string, 0, len(inner.clients))
		inner.lock.RLock()
		for _, cl := range inner.clients {
			if cl.isHealthy() {
				nodes = append(nodes, cl.address())
			}
			node := NodeStatistic{
				address:       cl.address(),
				methods:       cl.methodsStatus(),
				overallErrors: cl.overallErrorRate(),
				currentErrors: cl.currentErrorRate(),
			}
			stat.nodes = append(stat.nodes, node)
			stat.overallErrors += node.overallErrors
		}
		inner.lock.RUnlock()
		if len(stat.currentNodes) == 0 {
			stat.currentNodes = nodes
		}
	}

	return stat
}

// waitForContainerPresence waits until the container is found on the FrostFS network.
func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
	return waitFor(ctx, waitParams, func(ctx context.Context) bool {
		_, err := cli.containerGet(ctx, prm)
		return err == nil
	})
}

// waitForEACLPresence waits until the container eacl is applied on the FrostFS network.
func waitForEACLPresence(ctx context.Context, cli client, prm PrmContainerEACL, table *eacl.Table, waitParams *WaitParams) error {
	return waitFor(ctx, waitParams, func(ctx context.Context) bool {
		eaclTable, err := cli.containerEACL(ctx, prm)
		if err == nil {
			return eacl.EqualTables(*table, eaclTable)
		}
		return false
	})
}

// waitForContainerRemoved waits until the container is removed from the FrostFS network.
func waitForContainerRemoved(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
	return waitFor(ctx, waitParams, func(ctx context.Context) bool {
		_, err := cli.containerGet(ctx, prm)
		return sdkClient.IsErrContainerNotFound(err)
	})
}

// waitFor await that given condition will be met in waitParams time.
func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error {
	wctx, cancel := context.WithTimeout(ctx, params.Timeout)
	defer cancel()
	ticker := time.NewTimer(params.PollInterval)
	defer ticker.Stop()
	wdone := wctx.Done()
	done := ctx.Done()
	for {
		select {
		case <-done:
			return ctx.Err()
		case <-wdone:
			return wctx.Err()
		case <-ticker.C:
			if condition(ctx) {
				return nil
			}
			ticker.Reset(params.PollInterval)
		}
	}
}

// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
	cp, err := p.connection()
	if err != nil {
		return netmap.NetworkInfo{}, err
	}

	netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{})
	if err != nil {
		return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err)
	}

	return netInfo, nil
}

// NetMapSnapshot requests information about the FrostFS network map.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
	cp, err := p.connection()
	if err != nil {
		return netmap.NetMap{}, err
	}

	netMap, err := cp.netMapSnapshot(ctx, prmNetMapSnapshot{})
	if err != nil {
		return netmap.NetMap{}, fmt.Errorf("get network map via client '%s': %w", cp.address(), err)
	}

	return netMap, nil
}

// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
	p.cancel()
	<-p.closedCh

	// close all clients
	for _, pools := range p.innerPools {
		for _, cli := range pools.clients {
			if cli.isDialed() {
				_ = cli.close()
			}
		}
	}
}

// SyncContainerWithNetwork applies network configuration received via
// the Pool to the container. Changes the container if it does not satisfy
// network configuration.
//
// Pool and container MUST not be nil.
//
// Returns any error that does not allow reading configuration
// from the network.
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
	ni, err := p.NetworkInfo(ctx)
	if err != nil {
		return fmt.Errorf("network info: %w", err)
	}

	container.ApplyNetworkConfig(cnr, ni)

	return nil
}

// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
	var addr oid.Address
	addr.SetContainer(cnrID)
	addr.SetObject(objID)

	var prm PrmObjectHead
	prm.SetAddress(addr)
	if tokens.Bearer != nil {
		prm.UseBearer(*tokens.Bearer)
	}
	if tokens.Session != nil {
		prm.UseSession(*tokens.Session)
	}
	prm.MarkRaw()

	_, err := p.HeadObject(ctx, prm)

	var errSplit *object.SplitInfoError

	switch {
	case errors.As(err, &errSplit):
		return errSplit.SplitInfo(), nil
	case err == nil:
		return nil, relations.ErrNoSplitInfo
	default:
		return nil, fmt.Errorf("failed to get raw object header: %w", err)
	}
}

// ListChildrenByLinker implements relations.Relations.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
	var addr oid.Address
	addr.SetContainer(cnrID)
	addr.SetObject(objID)

	var prm PrmObjectHead
	prm.SetAddress(addr)
	if tokens.Bearer != nil {
		prm.UseBearer(*tokens.Bearer)
	}
	if tokens.Session != nil {
		prm.UseSession(*tokens.Session)
	}

	res, err := p.HeadObject(ctx, prm)
	if err != nil {
		return nil, fmt.Errorf("failed to get linking object's header: %w", err)
	}

	return res.Children(), nil
}

// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
	var addr oid.Address
	addr.SetContainer(cnrID)
	addr.SetObject(objID)

	var prm PrmObjectHead
	prm.SetAddress(addr)
	if tokens.Bearer != nil {
		prm.UseBearer(*tokens.Bearer)
	}
	if tokens.Session != nil {
		prm.UseSession(*tokens.Session)
	}

	res, err := p.HeadObject(ctx, prm)
	if err != nil {
		return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err)
	}

	idMember, ok := res.PreviousID()
	if !ok {
		return oid.ID{}, relations.ErrNoLeftSibling
	}
	return idMember, nil
}

// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
	var query object.SearchFilters
	query.AddSplitIDFilter(object.MatchStringEqual, splitID)

	var prm PrmObjectSearch
	prm.SetContainerID(cnrID)
	prm.SetFilters(query)
	if tokens.Bearer != nil {
		prm.UseBearer(*tokens.Bearer)
	}
	if tokens.Session != nil {
		prm.UseSession(*tokens.Session)
	}

	res, err := p.SearchObjects(ctx, prm)
	if err != nil {
		return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
	}

	var members []oid.ID
	err = res.Iterate(func(id oid.ID) bool {
		members = append(members, id)
		return false
	})
	if err != nil {
		return nil, fmt.Errorf("failed to iterate found objects: %w", err)
	}

	return members, nil
}

// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
	var query object.SearchFilters
	query.AddParentIDFilter(object.MatchStringEqual, objID)

	var prm PrmObjectSearch
	prm.SetContainerID(cnrID)
	prm.SetFilters(query)
	if tokens.Bearer != nil {
		prm.UseBearer(*tokens.Bearer)
	}
	if tokens.Session != nil {
		prm.UseSession(*tokens.Session)
	}

	resSearch, err := p.SearchObjects(ctx, prm)
	if err != nil {
		return nil, fmt.Errorf("failed to find object children: %w", err)
	}

	var res []oid.ID
	err = resSearch.Iterate(func(id oid.ID) bool {
		res = append(res, id)
		return false
	})
	if err != nil {
		return nil, fmt.Errorf("failed to iterate found objects: %w", err)
	}

	return res, nil
}