frostfs-sdk-go/pool/client.go
Ekaterina Lebedeva fe5b28e6bf [#338] pool: Support avg request time for ListContainerStream
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2025-03-10 19:24:58 +03:00

1303 lines
33 KiB
Go

package pool
import (
"bytes"
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
var errPoolClientUnhealthy = errors.New("pool client unhealthy")
// clientStatusMonitor count error rate and other statistics for connection.
type clientStatusMonitor struct {
logger *zap.Logger
addr string
healthy *atomic.Uint32
errorThreshold uint32
mu sync.RWMutex // protect counters
currentErrorCount uint32
overallErrorCount uint64
methods []*MethodStatus
}
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest = iota
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// MethodIndex index of method in list of statuses in clientStatusMonitor.
type MethodIndex int
const (
methodBalanceGet MethodIndex = iota
methodContainerPut
methodContainerGet
methodContainerList
methodContainerListStream
methodContainerDelete
methodEndpointInfo
methodNetworkInfo
methodNetMapSnapshot
methodObjectPut
methodObjectDelete
methodObjectGet
methodObjectHead
methodObjectRange
methodObjectPatch
methodSessionCreate
methodAPEManagerAddChain
methodAPEManagerRemoveChain
methodAPEManagerListChains
methodLast
)
// String implements fmt.Stringer.
func (m MethodIndex) String() string {
switch m {
case methodBalanceGet:
return "balanceGet"
case methodContainerPut:
return "containerPut"
case methodContainerGet:
return "containerGet"
case methodContainerList:
return "containerList"
case methodContainerListStream:
return "containerListStream"
case methodContainerDelete:
return "containerDelete"
case methodEndpointInfo:
return "endpointInfo"
case methodNetworkInfo:
return "networkInfo"
case methodNetMapSnapshot:
return "netMapSnapshot"
case methodObjectPut:
return "objectPut"
case methodObjectPatch:
return "objectPatch"
case methodObjectDelete:
return "objectDelete"
case methodObjectGet:
return "objectGet"
case methodObjectHead:
return "objectHead"
case methodObjectRange:
return "objectRange"
case methodSessionCreate:
return "sessionCreate"
case methodAPEManagerAddChain:
return "apeManagerAddChain"
case methodAPEManagerRemoveChain:
return "apeManagerRemoveChain"
case methodAPEManagerListChains:
return "apeManagerListChains"
case methodLast:
return "it's a system name rather than a method"
default:
return "unknown"
}
}
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*MethodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &MethodStatus{name: i.String()}
}
healthy := new(atomic.Uint32)
healthy.Store(statusHealthy)
return clientStatusMonitor{
logger: logger,
addr: addr,
healthy: healthy,
errorThreshold: errorThreshold,
methods: methods,
}
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct {
clientMutex sync.RWMutex
client *sdkClient.Client
dialed bool
prm wrapperPrm
clientStatusMonitor
}
// wrapperPrm is params to create clientWrapper.
type wrapperPrm struct {
logger *zap.Logger
address string
key ecdsa.PrivateKey
dialTimeout time.Duration
streamTimeout time.Duration
errorThreshold uint32
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo)
dialOptions []grpc.DialOption
gracefulCloseOnSwitchTimeout time.Duration
}
// setAddress sets endpoint to connect in FrostFS network.
func (x *wrapperPrm) setAddress(address string) {
x.address = address
}
// setKey sets sdkClient.Client private key to be used for the protocol communication by default.
func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key
}
// setLogger sets sdkClient.Client logger.
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
x.logger = logger
}
// setDialTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
x.dialTimeout = timeout
}
// setStreamTimeout sets the timeout for individual operations in streaming RPC.
func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
x.streamTimeout = timeout
}
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
// until Pool.startRebalance routing updates its status.
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// setGracefulCloseOnSwitchTimeout specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back.
//
// See also setErrorThreshold.
func (x *wrapperPrm) setGracefulCloseOnSwitchTimeout(timeout time.Duration) {
x.gracefulCloseOnSwitchTimeout = timeout
}
// setPoolRequestCallback sets callback that will be invoked after every pool response.
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
x.poolRequestInfoCallback = f
}
// setResponseInfoCallback sets callback that will be invoked after every response.
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
x.responseInfoCallback = f
}
// setGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) {
x.dialOptions = opts
}
// newWrapper creates a clientWrapper that implements the client interface.
func newWrapper(prm wrapperPrm) *clientWrapper {
var cl sdkClient.Client
prmInit := sdkClient.PrmInit{
Key: prm.key,
ResponseInfoCallback: prm.responseInfoCallback,
}
cl.Init(prmInit)
res := &clientWrapper{
client: &cl,
clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
prm: prm,
}
return res
}
// dial establishes a connection to the server from the FrostFS network.
// Returns an error describing failure reason. If failed, the client
// SHOULD NOT be used.
func (c *clientWrapper) dial(ctx context.Context) error {
cl, err := c.getClient()
if err != nil {
return err
}
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
}
err = cl.Dial(ctx, prmDial)
c.setDialed(err == nil)
if err != nil {
return err
}
return nil
}
// restart recreates and redial inner sdk client.
func (c *clientWrapper) restart(ctx context.Context) error {
var cl sdkClient.Client
prmInit := sdkClient.PrmInit{
Key: c.prm.key,
ResponseInfoCallback: c.prm.responseInfoCallback,
}
cl.Init(prmInit)
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
}
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
c.scheduleGracefulClose()
}
err := cl.Dial(ctx, prmDial)
c.setDialed(err == nil)
if err != nil {
return err
}
c.clientMutex.Lock()
c.client = &cl
c.clientMutex.Unlock()
return nil
}
func (c *clientWrapper) isDialed() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.dialed
}
func (c *clientWrapper) setDialed(dialed bool) {
c.mu.Lock()
c.dialed = dialed
c.mu.Unlock()
}
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
if c.isHealthy() {
return c.client, nil
}
return nil, errPoolClientUnhealthy
}
func (c *clientWrapper) getClientRaw() *sdkClient.Client {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
return c.client
}
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cl, err := c.getClient()
if err != nil {
return accounting.Decimal{}, err
}
cliPrm := sdkClient.PrmBalanceGet{
Account: prm.account,
}
start := time.Now()
res, err := cl.BalanceGet(ctx, cliPrm)
c.incRequests(time.Since(start), methodBalanceGet)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
}
return res.Amount(), nil
}
// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is.
// It also waits for the container to appear on the network.
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
cl, err := c.getClient()
if err != nil {
return cid.ID{}, err
}
start := time.Now()
res, err := cl.ContainerPut(ctx, prm.ClientParams)
c.incRequests(time.Since(start), methodContainerPut)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return cid.ID{}, fmt.Errorf("container put on client: %w", err)
}
if prm.WaitParams == nil {
prm.WaitParams = defaultWaitParams()
}
if err = prm.WaitParams.CheckValidity(); err != nil {
return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err)
}
idCnr := res.ID()
getPrm := PrmContainerGet{
ContainerID: idCnr,
Session: prm.ClientParams.Session,
}
err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams)
if err = c.handleError(ctx, nil, err); err != nil {
return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
}
return idCnr, nil
}
// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is.
func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
cl, err := c.getClient()
if err != nil {
return container.Container{}, err
}
cliPrm := sdkClient.PrmContainerGet{
ContainerID: &prm.ContainerID,
Session: prm.Session,
}
start := time.Now()
res, err := cl.ContainerGet(ctx, cliPrm)
c.incRequests(time.Since(start), methodContainerGet)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return container.Container{}, fmt.Errorf("container get on client: %w", err)
}
return res.Container(), nil
}
// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
cliPrm := sdkClient.PrmContainerList{
OwnerID: prm.OwnerID,
Session: prm.Session,
}
start := time.Now()
res, err := cl.ContainerList(ctx, cliPrm)
c.incRequests(time.Since(start), methodContainerList)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return nil, fmt.Errorf("container list on client: %w", err)
}
return res.Containers(), nil
}
// PrmListStream groups parameters of ListContainersStream operation.
type PrmListStream struct {
OwnerID user.ID
Session *session.Container
}
// ResListStream is designed to read list of object identifiers from FrostFS system.
//
// Must be initialized using Pool.ListContainersStream, any other usage is unsafe.
type ResListStream struct {
r *sdkClient.ContainerListReader
elapsedTimeCallback func(time.Duration)
handleError func(context.Context, apistatus.Status, error) error
}
// Read reads another list of the container identifiers.
func (x *ResListStream) Read(buf []cid.ID) (int, error) {
start := time.Now()
n, ok := x.r.Read(buf)
x.elapsedTimeCallback(time.Since(start))
if !ok {
res, err := x.r.Close()
if err == nil {
return n, io.EOF
}
var status apistatus.Status
if res != nil {
status = res.Status()
}
err = x.handleError(nil, status, err)
return n, err
}
return n, nil
}
// Iterate iterates over the list of found container identifiers.
// f can return true to stop iteration earlier.
//
// Returns an error if container can't be read.
func (x *ResListStream) Iterate(f func(cid.ID) bool) error {
start := time.Now()
err := x.r.Iterate(func(id cid.ID) bool {
x.elapsedTimeCallback(time.Since(start))
stop := f(id)
start = time.Now()
return stop
})
return err
}
// Close ends reading list of the matched containers and returns the result of the operation
// along with the final results. Must be called after using the ResListStream.
func (x *ResListStream) Close() {
_, _ = x.r.Close()
}
// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
cl, err := c.getClient()
if err != nil {
return ResListStream{}, err
}
cliPrm := sdkClient.PrmContainerListStream{
OwnerID: prm.OwnerID,
Session: prm.Session,
}
start := time.Now()
cnrRdr, err := cl.ContainerListInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodContainerListStream)
if err = c.handleError(ctx, nil, err); err != nil {
return ResListStream{}, fmt.Errorf("init container listing on client: %w", err)
}
return ResListStream{
r: cnrRdr,
elapsedTimeCallback: func(elapsed time.Duration) {
c.incRequests(elapsed, methodContainerListStream)
},
handleError: c.handleError,
}, nil
}
// containerDelete invokes sdkClient.ContainerDelete parse response status to error.
// It also waits for the container to be removed from the network.
func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error {
cl, err := c.getClient()
if err != nil {
return err
}
cliPrm := sdkClient.PrmContainerDelete{
ContainerID: &prm.ContainerID,
Session: prm.Session,
}
start := time.Now()
res, err := cl.ContainerDelete(ctx, cliPrm)
c.incRequests(time.Since(start), methodContainerDelete)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("container delete on client: %w", err)
}
if prm.WaitParams == nil {
prm.WaitParams = defaultWaitParams()
}
if err := prm.WaitParams.CheckValidity(); err != nil {
return fmt.Errorf("invalid wait parameters: %w", err)
}
getPrm := PrmContainerGet{
ContainerID: prm.ContainerID,
Session: prm.Session,
}
return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams)
}
// apeManagerAddChain invokes sdkClient.APEManagerAddChain and parse response status to error.
func (c *clientWrapper) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error {
cl, err := c.getClient()
if err != nil {
return err
}
cliPrm := sdkClient.PrmAPEManagerAddChain{
ChainTarget: prm.Target,
Chain: prm.Chain,
}
start := time.Now()
res, err := cl.APEManagerAddChain(ctx, cliPrm)
c.incRequests(time.Since(start), methodAPEManagerAddChain)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("add chain error: %w", err)
}
return nil
}
// apeManagerRemoveChain invokes sdkClient.APEManagerRemoveChain and parse response status to error.
func (c *clientWrapper) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error {
cl, err := c.getClient()
if err != nil {
return err
}
cliPrm := sdkClient.PrmAPEManagerRemoveChain{
ChainTarget: prm.Target,
ChainID: prm.ChainID,
}
start := time.Now()
res, err := cl.APEManagerRemoveChain(ctx, cliPrm)
c.incRequests(time.Since(start), methodAPEManagerRemoveChain)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("remove chain error: %w", err)
}
return nil
}
// apeManagerListChains invokes sdkClient.APEManagerListChains. Returns chains and parsed response status to error.
func (c *clientWrapper) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
cl, err := c.getClient()
if err != nil {
return nil, err
}
cliPrm := sdkClient.PrmAPEManagerListChains{
ChainTarget: prm.Target,
}
start := time.Now()
res, err := cl.APEManagerListChains(ctx, cliPrm)
c.incRequests(time.Since(start), methodAPEManagerListChains)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return nil, fmt.Errorf("list chains error: %w", err)
}
return res.Chains, nil
}
// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is.
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
cl, err := c.getClient()
if err != nil {
return netmap.NodeInfo{}, err
}
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
cl := c.getClientRaw()
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) {
start := time.Now()
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
c.incRequests(time.Since(start), methodEndpointInfo)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
}
return res.NodeInfo(), nil
}
// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
cl, err := c.getClient()
if err != nil {
return netmap.NetworkInfo{}, err
}
start := time.Now()
res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
c.incRequests(time.Since(start), methodNetworkInfo)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
}
return res.Info(), nil
}
// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) {
cl, err := c.getClient()
if err != nil {
return netmap.NetMap{}, err
}
start := time.Now()
res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{})
c.incRequests(time.Since(start), methodNetMapSnapshot)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err)
}
return res.NetMap(), nil
}
// objectPatch patches object in FrostFS.
func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
cl, err := c.getClient()
if err != nil {
return ResPatchObject{}, err
}
start := time.Now()
pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{
Address: prm.addr,
Session: prm.stoken,
Key: prm.key,
BearerToken: prm.btoken,
MaxChunkLength: prm.maxPayloadPatchChunkLength,
})
if err = c.handleError(ctx, nil, err); err != nil {
return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err)
}
c.incRequests(time.Since(start), methodObjectPatch)
start = time.Now()
attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs)
c.incRequests(time.Since(start), methodObjectPatch)
if attrPatchSuccess {
start = time.Now()
_ = pObj.PatchPayload(ctx, prm.rng, prm.payload)
c.incRequests(time.Since(start), methodObjectPatch)
}
res, err := pObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return ResPatchObject{}, fmt.Errorf("client failure: %w", err)
}
return ResPatchObject{ObjectID: res.ObjectID()}, nil
}
// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
if prm.clientCut {
return c.objectPutClientCut(ctx, prm)
}
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
cl, err := c.getClient()
if err != nil {
return ResPutObject{}, err
}
cliPrm := sdkClient.PrmObjectPutInit{
CopiesNumber: prm.copiesNumber,
Session: prm.stoken,
Key: prm.key,
BearerToken: prm.btoken,
}
start := time.Now()
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
}
return ResPutObject{
ObjectID: res.StoredObjectID(),
Epoch: res.StoredEpoch(),
}, nil
}
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
start := time.Now()
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.payload != nil {
if sz == 0 || sz > prm.bufferMaxSize {
sz = prm.bufferMaxSize
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
}
return ResPutObject{
ObjectID: res.OID,
Epoch: res.Epoch,
}, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
cl, err := c.getClient()
if err != nil {
return err
}
cnr := prm.addr.Container()
obj := prm.addr.Object()
cliPrm := sdkClient.PrmObjectDelete{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &cnr,
ObjectID: &obj,
Key: prm.key,
}
start := time.Now()
res, err := cl.ObjectDelete(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectDelete)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("delete object on client: %w", err)
}
return nil
}
// objectGet returns reader for object.
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
cl, err := c.getClient()
if err != nil {
return ResGetObject{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
cliPrm := sdkClient.PrmObjectGet{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
}
var res ResGetObject
rObj, err := cl.ObjectGetInit(ctx, cliPrm)
if err = c.handleError(ctx, nil, err); err != nil {
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
}
start := time.Now()
successReadHeader := rObj.ReadHeader(&res.Header)
c.incRequests(time.Since(start), methodObjectGet)
if !successReadHeader {
rObjRes, err := rObj.Close()
var st apistatus.Status
if rObjRes != nil {
st = rObjRes.Status()
}
err = c.handleError(ctx, st, err)
return res, fmt.Errorf("read header: %w", err)
}
res.Payload = &objectReadCloser{
reader: rObj,
elapsedTimeCallback: func(elapsed time.Duration) {
c.incRequests(elapsed, methodObjectGet)
},
}
return res, nil
}
// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is.
func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
cl, err := c.getClient()
if err != nil {
return object.Object{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
cliPrm := sdkClient.PrmObjectHead{
BearerToken: prm.btoken,
Session: prm.stoken,
Raw: prm.raw,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Key: prm.key,
}
var obj object.Object
start := time.Now()
res, err := cl.ObjectHead(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectHead)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return obj, fmt.Errorf("read object header via client: %w", err)
}
if !res.ReadHeader(&obj) {
return obj, errors.New("missing object header in response")
}
return obj, nil
}
// objectRange returns object range reader.
func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
cl, err := c.getClient()
if err != nil {
return ResObjectRange{}, err
}
prmCnr := prm.addr.Container()
prmObj := prm.addr.Object()
cliPrm := sdkClient.PrmObjectRange{
BearerToken: prm.btoken,
Session: prm.stoken,
ContainerID: &prmCnr,
ObjectID: &prmObj,
Offset: prm.off,
Length: prm.ln,
Key: prm.key,
}
start := time.Now()
res, err := cl.ObjectRangeInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectRange)
if err = c.handleError(ctx, nil, err); err != nil {
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
}
return ResObjectRange{
payload: res,
elapsedTimeCallback: func(elapsed time.Duration) {
c.incRequests(elapsed, methodObjectRange)
},
}, nil
}
// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is.
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
cl, err := c.getClient()
if err != nil {
return ResObjectSearch{}, err
}
cliPrm := sdkClient.PrmObjectSearch{
ContainerID: &prm.cnrID,
Filters: prm.filters,
Session: prm.stoken,
BearerToken: prm.btoken,
Key: prm.key,
}
res, err := cl.ObjectSearchInit(ctx, cliPrm)
if err = c.handleError(ctx, nil, err); err != nil {
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
}
return ResObjectSearch{r: res, handleError: c.handleError}, nil
}
// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) {
cl, err := c.getClient()
if err != nil {
return resCreateSession{}, err
}
cliPrm := sdkClient.PrmSessionCreate{
Expiration: prm.exp,
Key: &prm.key,
}
start := time.Now()
res, err := cl.SessionCreate(ctx, cliPrm)
c.incRequests(time.Since(start), methodSessionCreate)
var st apistatus.Status
if res != nil {
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil {
return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
}
return resCreateSession{
id: res.ID(),
sessionKey: res.PublicKey(),
}, nil
}
func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy)
}
func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) address() string {
return c.addr
}
func (c *clientStatusMonitor) incErrorRate() {
c.mu.Lock()
c.currentErrorCount++
c.overallErrorCount++
thresholdReached := c.currentErrorCount >= c.errorThreshold
if thresholdReached {
c.setUnhealthy()
c.currentErrorCount = 0
}
c.mu.Unlock()
if thresholdReached {
c.log(zapcore.WarnLevel, "error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
}
}
func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) {
c.mu.Lock()
c.currentErrorCount = 0
c.overallErrorCount++
c.setUnhealthy()
c.mu.Unlock()
c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err))
}
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
}
c.logger.Log(level, msg, fields...)
}
func (c *clientStatusMonitor) currentErrorRate() uint32 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.currentErrorCount
}
func (c *clientStatusMonitor) overallErrorRate() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.overallErrorCount
}
func (c *clientStatusMonitor) methodsStatus() []StatusSnapshot {
result := make([]StatusSnapshot, len(c.methods))
for i, val := range c.methods {
result[i] = val.Snapshot()
}
return result
}
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method]
methodStat.IncRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address,
Method: method,
Elapsed: elapsed,
})
}
}
func (c *clientWrapper) close() error {
if !c.isDialed() {
return nil
}
if cl := c.getClientRaw(); cl != nil {
return cl.Close()
}
return nil
}
func (c *clientWrapper) scheduleGracefulClose() {
cl := c.getClientRaw()
if cl == nil {
return
}
time.AfterFunc(c.prm.gracefulCloseOnSwitchTimeout, func() {
if err := cl.Close(); err != nil {
c.log(zap.DebugLevel, "close unhealthy client during rebalance", zap.String("address", c.address()), zap.Error(err))
}
})
}
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if stErr := apistatus.ErrFromStatus(st); stErr != nil {
switch stErr.(type) {
case *apistatus.ServerInternal,
*apistatus.WrongMagicNumber,
*apistatus.SignatureVerification:
c.incErrorRate()
case *apistatus.NodeUnderMaintenance:
c.incErrorRateToUnhealthy(stErr)
}
if err == nil {
err = stErr
}
return err
}
if err != nil {
if needCountError(ctx, err) {
if sdkClient.IsErrNodeUnderMaintenance(err) {
c.incErrorRateToUnhealthy(err)
} else {
c.incErrorRate()
}
}
return err
}
return nil
}
func needCountError(ctx context.Context, err error) bool {
// non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
var siErr *object.SplitInfoError
if errors.As(err, &siErr) {
return false
}
var eiErr *object.ECInfoError
if errors.As(err, &eiErr) {
return false
}
if ctx != nil && errors.Is(ctx.Err(), context.Canceled) {
return false
}
return true
}
// clientBuilder is a type alias of client constructors which open connection
// to the given endpoint.
type clientBuilder = func(endpoint string) client
// RequestInfo groups info about pool request.
type RequestInfo struct {
Address string
Method MethodIndex
Elapsed time.Duration
}