3031 lines
81 KiB
Go
3031 lines
81 KiB
Go
package pool
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/google/uuid"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
|
|
// This interface is expected to have exactly one production implementation - clientWrapper.
|
|
// Others are expected to be for test purposes only.
|
|
type client interface {
|
|
// see clientWrapper.balanceGet.
|
|
balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error)
|
|
// see clientWrapper.containerPut.
|
|
containerPut(context.Context, PrmContainerPut) (cid.ID, error)
|
|
// see clientWrapper.containerGet.
|
|
containerGet(context.Context, PrmContainerGet) (container.Container, error)
|
|
// see clientWrapper.containerList.
|
|
containerList(context.Context, PrmContainerList) ([]cid.ID, error)
|
|
// see clientWrapper.containerDelete.
|
|
containerDelete(context.Context, PrmContainerDelete) error
|
|
// see clientWrapper.containerEACL.
|
|
containerEACL(context.Context, PrmContainerEACL) (eacl.Table, error)
|
|
// see clientWrapper.containerSetEACL.
|
|
containerSetEACL(context.Context, PrmContainerSetEACL) error
|
|
// see clientWrapper.endpointInfo.
|
|
endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
|
|
// see clientWrapper.networkInfo.
|
|
networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
|
|
// see clientWrapper.netMapSnapshot
|
|
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
|
// see clientWrapper.objectPut.
|
|
objectPut(context.Context, PrmObjectPut) (oid.ID, error)
|
|
// see clientWrapper.objectDelete.
|
|
objectDelete(context.Context, PrmObjectDelete) error
|
|
// see clientWrapper.objectGet.
|
|
objectGet(context.Context, PrmObjectGet) (ResGetObject, error)
|
|
// see clientWrapper.objectHead.
|
|
objectHead(context.Context, PrmObjectHead) (object.Object, error)
|
|
// see clientWrapper.objectRange.
|
|
objectRange(context.Context, PrmObjectRange) (ResObjectRange, error)
|
|
// see clientWrapper.objectSearch.
|
|
objectSearch(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
|
// see clientWrapper.sessionCreate.
|
|
sessionCreate(context.Context, prmCreateSession) (resCreateSession, error)
|
|
|
|
clientStatus
|
|
|
|
// see clientWrapper.dial.
|
|
dial(ctx context.Context) error
|
|
// see clientWrapper.restartIfUnhealthy.
|
|
restartIfUnhealthy(ctx context.Context) (bool, bool)
|
|
// see clientWrapper.close.
|
|
close() error
|
|
}
|
|
|
|
// clientStatus provide access to some metrics for connection.
|
|
type clientStatus interface {
|
|
// isHealthy checks if the connection can handle requests.
|
|
isHealthy() bool
|
|
// isDialed checks if the connection was created.
|
|
isDialed() bool
|
|
// setUnhealthy marks client as unhealthy.
|
|
setUnhealthy()
|
|
// address return address of endpoint.
|
|
address() string
|
|
// currentErrorRate returns current errors rate.
|
|
// After specific threshold connection is considered as unhealthy.
|
|
// Pool.startRebalance routine can make this connection healthy again.
|
|
currentErrorRate() uint32
|
|
// overallErrorRate returns the number of all happened errors.
|
|
overallErrorRate() uint64
|
|
// methodsStatus returns statistic for all used methods.
|
|
methodsStatus() []statusSnapshot
|
|
}
|
|
|
|
// errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy.
|
|
var errPoolClientUnhealthy = errors.New("pool client unhealthy")
|
|
|
|
// clientStatusMonitor count error rate and other statistics for connection.
|
|
type clientStatusMonitor struct {
|
|
logger *zap.Logger
|
|
addr string
|
|
healthy *atomic.Uint32
|
|
errorThreshold uint32
|
|
|
|
mu sync.RWMutex // protect counters
|
|
currentErrorCount uint32
|
|
overallErrorCount uint64
|
|
methods []*methodStatus
|
|
}
|
|
|
|
// values for healthy status of clientStatusMonitor.
|
|
const (
|
|
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
|
|
// so there is no connection to the endpoint, and pool should not close it
|
|
// before re-establishing connection once again.
|
|
statusUnhealthyOnDial = iota
|
|
|
|
// statusUnhealthyOnRequest is set when communication after dialing to the
|
|
// endpoint is failed due to immediate or accumulated errors, connection is
|
|
// available and pool should close it before re-establishing connection once again.
|
|
statusUnhealthyOnRequest
|
|
|
|
// statusHealthy is set when connection is ready to be used by the pool.
|
|
statusHealthy
|
|
)
|
|
|
|
// methodStatus provide statistic for specific method.
|
|
type methodStatus struct {
|
|
name string
|
|
mu sync.RWMutex // protect counters
|
|
statusSnapshot
|
|
}
|
|
|
|
// statusSnapshot is statistic for specific method.
|
|
type statusSnapshot struct {
|
|
allTime uint64
|
|
allRequests uint64
|
|
}
|
|
|
|
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
|
type MethodIndex int
|
|
|
|
const (
|
|
methodBalanceGet MethodIndex = iota
|
|
methodContainerPut
|
|
methodContainerGet
|
|
methodContainerList
|
|
methodContainerDelete
|
|
methodContainerEACL
|
|
methodContainerSetEACL
|
|
methodEndpointInfo
|
|
methodNetworkInfo
|
|
methodNetMapSnapshot
|
|
methodObjectPut
|
|
methodObjectDelete
|
|
methodObjectGet
|
|
methodObjectHead
|
|
methodObjectRange
|
|
methodSessionCreate
|
|
methodLast
|
|
)
|
|
|
|
// String implements fmt.Stringer.
|
|
func (m MethodIndex) String() string {
|
|
switch m {
|
|
case methodBalanceGet:
|
|
return "balanceGet"
|
|
case methodContainerPut:
|
|
return "containerPut"
|
|
case methodContainerGet:
|
|
return "containerGet"
|
|
case methodContainerList:
|
|
return "containerList"
|
|
case methodContainerDelete:
|
|
return "containerDelete"
|
|
case methodContainerEACL:
|
|
return "containerEACL"
|
|
case methodContainerSetEACL:
|
|
return "containerSetEACL"
|
|
case methodEndpointInfo:
|
|
return "endpointInfo"
|
|
case methodNetworkInfo:
|
|
return "networkInfo"
|
|
case methodNetMapSnapshot:
|
|
return "netMapSnapshot"
|
|
case methodObjectPut:
|
|
return "objectPut"
|
|
case methodObjectDelete:
|
|
return "objectDelete"
|
|
case methodObjectGet:
|
|
return "objectGet"
|
|
case methodObjectHead:
|
|
return "objectHead"
|
|
case methodObjectRange:
|
|
return "objectRange"
|
|
case methodSessionCreate:
|
|
return "sessionCreate"
|
|
case methodLast:
|
|
return "it's a system name rather than a method"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
|
methods := make([]*methodStatus, methodLast)
|
|
for i := methodBalanceGet; i < methodLast; i++ {
|
|
methods[i] = &methodStatus{name: i.String()}
|
|
}
|
|
|
|
healthy := new(atomic.Uint32)
|
|
healthy.Store(statusHealthy)
|
|
|
|
return clientStatusMonitor{
|
|
logger: logger,
|
|
addr: addr,
|
|
healthy: healthy,
|
|
errorThreshold: errorThreshold,
|
|
methods: methods,
|
|
}
|
|
}
|
|
|
|
func (m *methodStatus) snapshot() statusSnapshot {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.statusSnapshot
|
|
}
|
|
|
|
func (m *methodStatus) incRequests(elapsed time.Duration) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.allTime += uint64(elapsed)
|
|
m.allRequests++
|
|
}
|
|
|
|
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
|
type clientWrapper struct {
|
|
clientMutex sync.RWMutex
|
|
client *sdkClient.Client
|
|
prm wrapperPrm
|
|
|
|
clientStatusMonitor
|
|
}
|
|
|
|
// wrapperPrm is params to create clientWrapper.
|
|
type wrapperPrm struct {
|
|
logger *zap.Logger
|
|
address string
|
|
key ecdsa.PrivateKey
|
|
dialTimeout time.Duration
|
|
streamTimeout time.Duration
|
|
errorThreshold uint32
|
|
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
|
|
poolRequestInfoCallback func(RequestInfo)
|
|
dialOptions []grpc.DialOption
|
|
}
|
|
|
|
// setAddress sets endpoint to connect in FrostFS network.
|
|
func (x *wrapperPrm) setAddress(address string) {
|
|
x.address = address
|
|
}
|
|
|
|
// setKey sets sdkClient.Client private key to be used for the protocol communication by default.
|
|
func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// setLogger sets sdkClient.Client logger.
|
|
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
|
|
x.logger = logger
|
|
}
|
|
|
|
// setDialTimeout sets the timeout for connection to be established.
|
|
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
|
|
x.dialTimeout = timeout
|
|
}
|
|
|
|
// setStreamTimeout sets the timeout for individual operations in streaming RPC.
|
|
func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
|
|
x.streamTimeout = timeout
|
|
}
|
|
|
|
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
|
|
// until Pool.startRebalance routing updates its status.
|
|
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
|
x.errorThreshold = threshold
|
|
}
|
|
|
|
// setPoolRequestCallback sets callback that will be invoked after every pool response.
|
|
func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) {
|
|
x.poolRequestInfoCallback = f
|
|
}
|
|
|
|
// setResponseInfoCallback sets callback that will be invoked after every response.
|
|
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
|
|
x.responseInfoCallback = f
|
|
}
|
|
|
|
// setGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
|
|
func (x *wrapperPrm) setGRPCDialOptions(opts []grpc.DialOption) {
|
|
x.dialOptions = opts
|
|
}
|
|
|
|
// newWrapper creates a clientWrapper that implements the client interface.
|
|
func newWrapper(prm wrapperPrm) *clientWrapper {
|
|
var cl sdkClient.Client
|
|
prmInit := sdkClient.PrmInit{
|
|
Key: prm.key,
|
|
ResponseInfoCallback: prm.responseInfoCallback,
|
|
}
|
|
|
|
cl.Init(prmInit)
|
|
|
|
res := &clientWrapper{
|
|
client: &cl,
|
|
clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
|
|
prm: prm,
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// dial establishes a connection to the server from the FrostFS network.
|
|
// Returns an error describing failure reason. If failed, the client
|
|
// SHOULD NOT be used.
|
|
func (c *clientWrapper) dial(ctx context.Context) error {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
prmDial := sdkClient.PrmDial{
|
|
Endpoint: c.prm.address,
|
|
DialTimeout: c.prm.dialTimeout,
|
|
StreamTimeout: c.prm.streamTimeout,
|
|
GRPCDialOptions: c.prm.dialOptions,
|
|
}
|
|
|
|
if err = cl.Dial(ctx, prmDial); err != nil {
|
|
c.setUnhealthyOnDial()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
|
// Return current healthy status and indicating if status was changed by this function call.
|
|
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, changed bool) {
|
|
var wasHealthy bool
|
|
if _, err := c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
|
|
return true, false
|
|
} else if !errors.Is(err, errPoolClientUnhealthy) {
|
|
wasHealthy = true
|
|
}
|
|
|
|
// if connection is dialed before, to avoid routine / connection leak,
|
|
// pool has to close it and then initialize once again.
|
|
if c.isDialed() {
|
|
_ = c.close()
|
|
}
|
|
|
|
var cl sdkClient.Client
|
|
prmInit := sdkClient.PrmInit{
|
|
Key: c.prm.key,
|
|
ResponseInfoCallback: c.prm.responseInfoCallback,
|
|
}
|
|
|
|
cl.Init(prmInit)
|
|
|
|
prmDial := sdkClient.PrmDial{
|
|
Endpoint: c.prm.address,
|
|
DialTimeout: c.prm.dialTimeout,
|
|
StreamTimeout: c.prm.streamTimeout,
|
|
GRPCDialOptions: c.prm.dialOptions,
|
|
}
|
|
|
|
if err := cl.Dial(ctx, prmDial); err != nil {
|
|
c.setUnhealthyOnDial()
|
|
return false, wasHealthy
|
|
}
|
|
|
|
c.clientMutex.Lock()
|
|
c.client = &cl
|
|
c.clientMutex.Unlock()
|
|
|
|
if _, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
|
|
c.setUnhealthy()
|
|
return false, wasHealthy
|
|
}
|
|
|
|
c.setHealthy()
|
|
return true, !wasHealthy
|
|
}
|
|
|
|
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
|
|
c.clientMutex.RLock()
|
|
defer c.clientMutex.RUnlock()
|
|
if c.isHealthy() {
|
|
return c.client, nil
|
|
}
|
|
return nil, errPoolClientUnhealthy
|
|
}
|
|
|
|
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
|
|
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return accounting.Decimal{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmBalanceGet{
|
|
Account: prm.account,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.BalanceGet(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodBalanceGet)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
|
|
}
|
|
|
|
return res.Amount(), nil
|
|
}
|
|
|
|
// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is.
|
|
// It also waits for the container to appear on the network.
|
|
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return cid.ID{}, err
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerPut(ctx, prm.ClientParams)
|
|
c.incRequests(time.Since(start), methodContainerPut)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return cid.ID{}, fmt.Errorf("container put on client: %w", err)
|
|
}
|
|
|
|
if prm.WaitParams == nil {
|
|
prm.WaitParams = defaultWaitParams()
|
|
}
|
|
if err = prm.WaitParams.CheckValidity(); err != nil {
|
|
return cid.ID{}, fmt.Errorf("invalid wait parameters: %w", err)
|
|
}
|
|
|
|
idCnr := res.ID()
|
|
|
|
getPrm := PrmContainerGet{
|
|
ContainerID: idCnr,
|
|
Session: prm.ClientParams.Session,
|
|
}
|
|
|
|
err = waitForContainerPresence(ctx, c, getPrm, prm.WaitParams)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
|
|
}
|
|
|
|
return idCnr, nil
|
|
}
|
|
|
|
// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is.
|
|
func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return container.Container{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmContainerGet{
|
|
ContainerID: &prm.ContainerID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerGet(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodContainerGet)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return container.Container{}, fmt.Errorf("container get on client: %w", err)
|
|
}
|
|
|
|
return res.Container(), nil
|
|
}
|
|
|
|
// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
|
|
func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmContainerList{
|
|
Account: prm.OwnerID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerList(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodContainerList)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return nil, fmt.Errorf("container list on client: %w", err)
|
|
}
|
|
return res.Containers(), nil
|
|
}
|
|
|
|
// containerDelete invokes sdkClient.ContainerDelete parse response status to error.
|
|
// It also waits for the container to be removed from the network.
|
|
func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmContainerDelete{
|
|
ContainerID: &prm.ContainerID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerDelete(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodContainerDelete)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return fmt.Errorf("container delete on client: %w", err)
|
|
}
|
|
|
|
if prm.WaitParams == nil {
|
|
prm.WaitParams = defaultWaitParams()
|
|
}
|
|
if err := prm.WaitParams.CheckValidity(); err != nil {
|
|
return fmt.Errorf("invalid wait parameters: %w", err)
|
|
}
|
|
|
|
getPrm := PrmContainerGet{
|
|
ContainerID: prm.ContainerID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
return waitForContainerRemoved(ctx, c, getPrm, prm.WaitParams)
|
|
}
|
|
|
|
// containerEACL invokes sdkClient.ContainerEACL parse response status to error and return result as is.
|
|
func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return eacl.Table{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmContainerEACL{
|
|
ContainerID: &prm.ContainerID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerEACL(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodContainerEACL)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err)
|
|
}
|
|
|
|
return res.Table(), nil
|
|
}
|
|
|
|
// containerSetEACL invokes sdkClient.ContainerSetEACL parse response status to error.
|
|
// It also waits for the EACL to appear on the network.
|
|
func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmContainerSetEACL{
|
|
Table: &prm.Table,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ContainerSetEACL(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodContainerSetEACL)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return fmt.Errorf("set eacl on client: %w", err)
|
|
}
|
|
|
|
if prm.WaitParams == nil {
|
|
prm.WaitParams = defaultWaitParams()
|
|
}
|
|
if err := prm.WaitParams.CheckValidity(); err != nil {
|
|
return fmt.Errorf("invalid wait parameters: %w", err)
|
|
}
|
|
|
|
cnrID, _ := prm.Table.CID()
|
|
eaclPrm := PrmContainerEACL{
|
|
ContainerID: cnrID,
|
|
Session: prm.Session,
|
|
}
|
|
|
|
err = waitForEACLPresence(ctx, c, eaclPrm, &prm.Table, prm.WaitParams)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return fmt.Errorf("wait eacl presence on client: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is.
|
|
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return netmap.NodeInfo{}, err
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
|
c.incRequests(time.Since(start), methodEndpointInfo)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
|
|
}
|
|
|
|
return res.NodeInfo(), nil
|
|
}
|
|
|
|
// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
|
|
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return netmap.NetworkInfo{}, err
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
|
c.incRequests(time.Since(start), methodNetworkInfo)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
|
|
}
|
|
|
|
return res.Info(), nil
|
|
}
|
|
|
|
// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
|
|
func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) (netmap.NetMap, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return netmap.NetMap{}, err
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.NetMapSnapshot(ctx, sdkClient.PrmNetMapSnapshot{})
|
|
c.incRequests(time.Since(start), methodNetMapSnapshot)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return netmap.NetMap{}, fmt.Errorf("network map snapshot on client: %w", err)
|
|
}
|
|
|
|
return res.NetMap(), nil
|
|
}
|
|
|
|
// objectPut writes object to FrostFS.
|
|
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
if prm.bufferMaxSize == 0 {
|
|
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
|
}
|
|
|
|
if prm.clientCut {
|
|
return c.objectPutClientCut(ctx, prm)
|
|
}
|
|
|
|
return c.objectPutServerCut(ctx, prm)
|
|
}
|
|
|
|
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmObjectPutInit{
|
|
CopiesNumber: prm.copiesNumber,
|
|
Session: prm.stoken,
|
|
Key: prm.key,
|
|
BearerToken: prm.btoken,
|
|
}
|
|
|
|
start := time.Now()
|
|
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodObjectPut)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
|
}
|
|
|
|
if wObj.WriteHeader(ctx, prm.hdr) {
|
|
sz := prm.hdr.PayloadSize()
|
|
|
|
if data := prm.hdr.Payload(); len(data) > 0 {
|
|
if prm.payload != nil {
|
|
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
|
} else {
|
|
prm.payload = bytes.NewReader(data)
|
|
sz = uint64(len(data))
|
|
}
|
|
}
|
|
|
|
if prm.payload != nil {
|
|
if sz == 0 || sz > prm.bufferMaxSize {
|
|
sz = prm.bufferMaxSize
|
|
}
|
|
|
|
buf := make([]byte, sz)
|
|
|
|
var n int
|
|
|
|
for {
|
|
n, err = prm.payload.Read(buf)
|
|
if n > 0 {
|
|
start = time.Now()
|
|
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
|
c.incRequests(time.Since(start), methodObjectPut)
|
|
if !successWrite {
|
|
break
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
|
|
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
res, err := wObj.Close(ctx)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
|
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
|
}
|
|
|
|
return res.StoredObjectID(), nil
|
|
}
|
|
|
|
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
putInitPrm := PrmObjectPutClientCutInit{
|
|
PrmObjectPut: prm,
|
|
}
|
|
|
|
start := time.Now()
|
|
wObj, err := c.objectPutInitTransformer(putInitPrm)
|
|
c.incRequests(time.Since(start), methodObjectPut)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
|
}
|
|
|
|
if wObj.WriteHeader(ctx, prm.hdr) {
|
|
sz := prm.hdr.PayloadSize()
|
|
|
|
if data := prm.hdr.Payload(); len(data) > 0 {
|
|
if prm.payload != nil {
|
|
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
|
} else {
|
|
prm.payload = bytes.NewReader(data)
|
|
sz = uint64(len(data))
|
|
}
|
|
}
|
|
|
|
if prm.payload != nil {
|
|
if sz == 0 || sz > prm.bufferMaxSize {
|
|
sz = prm.bufferMaxSize
|
|
}
|
|
|
|
buf := make([]byte, sz)
|
|
|
|
var n int
|
|
|
|
for {
|
|
n, err = prm.payload.Read(buf)
|
|
if n > 0 {
|
|
start = time.Now()
|
|
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
|
c.incRequests(time.Since(start), methodObjectPut)
|
|
if !successWrite {
|
|
break
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
|
|
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
res, err := wObj.Close(ctx)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
|
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
|
}
|
|
|
|
return res.OID, nil
|
|
}
|
|
|
|
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
|
|
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cnr := prm.addr.Container()
|
|
obj := prm.addr.Object()
|
|
|
|
cliPrm := sdkClient.PrmObjectDelete{
|
|
BearerToken: prm.btoken,
|
|
Session: prm.stoken,
|
|
ContainerID: &cnr,
|
|
ObjectID: &obj,
|
|
Key: prm.key,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ObjectDelete(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodObjectDelete)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return fmt.Errorf("delete object on client: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// objectGet returns reader for object.
|
|
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return ResGetObject{}, err
|
|
}
|
|
|
|
prmCnr := prm.addr.Container()
|
|
prmObj := prm.addr.Object()
|
|
|
|
cliPrm := sdkClient.PrmObjectGet{
|
|
BearerToken: prm.btoken,
|
|
Session: prm.stoken,
|
|
ContainerID: &prmCnr,
|
|
ObjectID: &prmObj,
|
|
Key: prm.key,
|
|
}
|
|
|
|
var res ResGetObject
|
|
|
|
rObj, err := cl.ObjectGetInit(ctx, cliPrm)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
|
|
}
|
|
|
|
start := time.Now()
|
|
successReadHeader := rObj.ReadHeader(&res.Header)
|
|
c.incRequests(time.Since(start), methodObjectGet)
|
|
if !successReadHeader {
|
|
rObjRes, err := rObj.Close()
|
|
var st apistatus.Status
|
|
if rObjRes != nil {
|
|
st = rObjRes.Status()
|
|
}
|
|
err = c.handleError(ctx, st, err)
|
|
return res, fmt.Errorf("read header: %w", err)
|
|
}
|
|
|
|
res.Payload = &objectReadCloser{
|
|
reader: rObj,
|
|
elapsedTimeCallback: func(elapsed time.Duration) {
|
|
c.incRequests(elapsed, methodObjectGet)
|
|
},
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is.
|
|
func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return object.Object{}, err
|
|
}
|
|
|
|
prmCnr := prm.addr.Container()
|
|
prmObj := prm.addr.Object()
|
|
|
|
cliPrm := sdkClient.PrmObjectHead{
|
|
BearerToken: prm.btoken,
|
|
Session: prm.stoken,
|
|
Raw: prm.raw,
|
|
ContainerID: &prmCnr,
|
|
ObjectID: &prmObj,
|
|
Key: prm.key,
|
|
}
|
|
|
|
var obj object.Object
|
|
|
|
start := time.Now()
|
|
res, err := cl.ObjectHead(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodObjectHead)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return obj, fmt.Errorf("read object header via client: %w", err)
|
|
}
|
|
if !res.ReadHeader(&obj) {
|
|
return obj, errors.New("missing object header in response")
|
|
}
|
|
|
|
return obj, nil
|
|
}
|
|
|
|
// objectRange returns object range reader.
|
|
func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return ResObjectRange{}, err
|
|
}
|
|
|
|
prmCnr := prm.addr.Container()
|
|
prmObj := prm.addr.Object()
|
|
|
|
cliPrm := sdkClient.PrmObjectRange{
|
|
BearerToken: prm.btoken,
|
|
Session: prm.stoken,
|
|
ContainerID: &prmCnr,
|
|
ObjectID: &prmObj,
|
|
Offset: prm.off,
|
|
Length: prm.ln,
|
|
Key: prm.key,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.ObjectRangeInit(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodObjectRange)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
|
|
}
|
|
|
|
return ResObjectRange{
|
|
payload: res,
|
|
elapsedTimeCallback: func(elapsed time.Duration) {
|
|
c.incRequests(elapsed, methodObjectRange)
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is.
|
|
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return ResObjectSearch{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmObjectSearch{
|
|
ContainerID: &prm.cnrID,
|
|
Filters: prm.filters,
|
|
Session: prm.stoken,
|
|
BearerToken: prm.btoken,
|
|
Key: prm.key,
|
|
}
|
|
|
|
res, err := cl.ObjectSearchInit(ctx, cliPrm)
|
|
if err = c.handleError(ctx, nil, err); err != nil {
|
|
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
|
|
}
|
|
|
|
return ResObjectSearch{r: res}, nil
|
|
}
|
|
|
|
// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
|
|
func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (resCreateSession, error) {
|
|
cl, err := c.getClient()
|
|
if err != nil {
|
|
return resCreateSession{}, err
|
|
}
|
|
|
|
cliPrm := sdkClient.PrmSessionCreate{
|
|
Expiration: prm.exp,
|
|
Key: &prm.key,
|
|
}
|
|
|
|
start := time.Now()
|
|
res, err := cl.SessionCreate(ctx, cliPrm)
|
|
c.incRequests(time.Since(start), methodSessionCreate)
|
|
var st apistatus.Status
|
|
if res != nil {
|
|
st = res.Status()
|
|
}
|
|
if err = c.handleError(ctx, st, err); err != nil {
|
|
return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
|
|
}
|
|
|
|
return resCreateSession{
|
|
id: res.ID(),
|
|
sessionKey: res.PublicKey(),
|
|
}, nil
|
|
}
|
|
|
|
func (c *clientStatusMonitor) isHealthy() bool {
|
|
return c.healthy.Load() == statusHealthy
|
|
}
|
|
|
|
func (c *clientStatusMonitor) isDialed() bool {
|
|
return c.healthy.Load() != statusUnhealthyOnDial
|
|
}
|
|
|
|
func (c *clientStatusMonitor) setHealthy() {
|
|
c.healthy.Store(statusHealthy)
|
|
}
|
|
|
|
func (c *clientStatusMonitor) setUnhealthy() {
|
|
c.healthy.Store(statusUnhealthyOnRequest)
|
|
}
|
|
|
|
func (c *clientStatusMonitor) setUnhealthyOnDial() {
|
|
c.healthy.Store(statusUnhealthyOnDial)
|
|
}
|
|
|
|
func (c *clientStatusMonitor) address() string {
|
|
return c.addr
|
|
}
|
|
|
|
func (c *clientStatusMonitor) incErrorRate() {
|
|
c.mu.Lock()
|
|
c.currentErrorCount++
|
|
c.overallErrorCount++
|
|
|
|
thresholdReached := c.currentErrorCount >= c.errorThreshold
|
|
if thresholdReached {
|
|
c.setUnhealthy()
|
|
c.currentErrorCount = 0
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if thresholdReached {
|
|
c.log(zapcore.WarnLevel, "error threshold reached",
|
|
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
|
|
}
|
|
}
|
|
|
|
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
|
if c.logger == nil {
|
|
return
|
|
}
|
|
|
|
c.logger.Log(level, msg, fields...)
|
|
}
|
|
|
|
func (c *clientStatusMonitor) currentErrorRate() uint32 {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentErrorCount
|
|
}
|
|
|
|
func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.overallErrorCount
|
|
}
|
|
|
|
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
|
result := make([]statusSnapshot, len(c.methods))
|
|
for i, val := range c.methods {
|
|
result[i] = val.snapshot()
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
|
methodStat := c.methods[method]
|
|
methodStat.incRequests(elapsed)
|
|
if c.prm.poolRequestInfoCallback != nil {
|
|
c.prm.poolRequestInfoCallback(RequestInfo{
|
|
Address: c.prm.address,
|
|
Method: method,
|
|
Elapsed: elapsed,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *clientWrapper) close() error {
|
|
if c.client != nil {
|
|
return c.client.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
|
|
if err != nil {
|
|
if needCountError(ctx, err) {
|
|
c.incErrorRate()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
err = apistatus.ErrFromStatus(st)
|
|
switch err.(type) {
|
|
case *apistatus.ServerInternal,
|
|
*apistatus.WrongMagicNumber,
|
|
*apistatus.SignatureVerification,
|
|
*apistatus.NodeUnderMaintenance:
|
|
c.incErrorRate()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func needCountError(ctx context.Context, err error) bool {
|
|
// non-status logic error that could be returned
|
|
// from the SDK client; should not be considered
|
|
// as a connection error
|
|
var siErr *object.SplitInfoError
|
|
if errors.As(err, &siErr) {
|
|
return false
|
|
}
|
|
|
|
if errors.Is(ctx.Err(), context.Canceled) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// clientBuilder is a type alias of client constructors which open connection
|
|
// to the given endpoint.
|
|
type clientBuilder = func(endpoint string) client
|
|
|
|
// RequestInfo groups info about pool request.
|
|
type RequestInfo struct {
|
|
Address string
|
|
Method MethodIndex
|
|
Elapsed time.Duration
|
|
}
|
|
|
|
// InitParameters contains values used to initialize connection Pool.
|
|
type InitParameters struct {
|
|
key *ecdsa.PrivateKey
|
|
logger *zap.Logger
|
|
nodeDialTimeout time.Duration
|
|
nodeStreamTimeout time.Duration
|
|
healthcheckTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
sessionExpirationDuration uint64
|
|
errorThreshold uint32
|
|
nodeParams []NodeParam
|
|
requestCallback func(RequestInfo)
|
|
dialOptions []grpc.DialOption
|
|
|
|
clientBuilder clientBuilder
|
|
}
|
|
|
|
// SetKey specifies default key to be used for the protocol communication by default.
|
|
func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// SetLogger specifies logger.
|
|
func (x *InitParameters) SetLogger(logger *zap.Logger) {
|
|
x.logger = logger
|
|
}
|
|
|
|
// SetNodeDialTimeout specifies the timeout for connection to be established.
|
|
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
|
|
x.nodeDialTimeout = timeout
|
|
}
|
|
|
|
// SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
|
|
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
|
|
x.nodeStreamTimeout = timeout
|
|
}
|
|
|
|
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
|
x.healthcheckTimeout = timeout
|
|
}
|
|
|
|
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
|
//
|
|
// See also Pool.Dial.
|
|
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|
x.clientRebalanceInterval = interval
|
|
}
|
|
|
|
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
|
|
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
|
|
x.sessionExpirationDuration = expirationDuration
|
|
}
|
|
|
|
// SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.
|
|
func (x *InitParameters) SetErrorThreshold(threshold uint32) {
|
|
x.errorThreshold = threshold
|
|
}
|
|
|
|
// SetRequestCallback makes the pool client to pass RequestInfo for each
|
|
// request to f. Nil (default) means ignore RequestInfo.
|
|
func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) {
|
|
x.requestCallback = f
|
|
}
|
|
|
|
// AddNode append information about the node to which you want to connect.
|
|
func (x *InitParameters) AddNode(nodeParam NodeParam) {
|
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
|
}
|
|
|
|
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
|
|
func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
|
x.dialOptions = opts
|
|
}
|
|
|
|
// setClientBuilder sets clientBuilder used for client construction.
|
|
// Wraps setClientBuilderContext without a context.
|
|
func (x *InitParameters) setClientBuilder(builder clientBuilder) {
|
|
x.clientBuilder = builder
|
|
}
|
|
|
|
// isMissingClientBuilder checks if client constructor was not specified.
|
|
func (x *InitParameters) isMissingClientBuilder() bool {
|
|
return x.clientBuilder == nil
|
|
}
|
|
|
|
type rebalanceParameters struct {
|
|
nodesParams []*nodesParam
|
|
nodeRequestTimeout time.Duration
|
|
clientRebalanceInterval time.Duration
|
|
sessionExpirationDuration uint64
|
|
}
|
|
|
|
type nodesParam struct {
|
|
priority int
|
|
addresses []string
|
|
weights []float64
|
|
}
|
|
|
|
// NodeParam groups parameters of remote node.
|
|
type NodeParam struct {
|
|
priority int
|
|
address string
|
|
weight float64
|
|
}
|
|
|
|
// NewNodeParam creates NodeParam using parameters.
|
|
func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) {
|
|
prm.SetPriority(priority)
|
|
prm.SetAddress(address)
|
|
prm.SetWeight(weight)
|
|
|
|
return
|
|
}
|
|
|
|
// SetPriority specifies priority of the node.
|
|
// Negative value is allowed. In the result node groups
|
|
// with the same priority will be sorted by descent.
|
|
func (x *NodeParam) SetPriority(priority int) {
|
|
x.priority = priority
|
|
}
|
|
|
|
// Priority returns priority of the node.
|
|
func (x *NodeParam) Priority() int {
|
|
return x.priority
|
|
}
|
|
|
|
// SetAddress specifies address of the node.
|
|
func (x *NodeParam) SetAddress(address string) {
|
|
x.address = address
|
|
}
|
|
|
|
// Address returns address of the node.
|
|
func (x *NodeParam) Address() string {
|
|
return x.address
|
|
}
|
|
|
|
// SetWeight specifies weight of the node.
|
|
func (x *NodeParam) SetWeight(weight float64) {
|
|
x.weight = weight
|
|
}
|
|
|
|
// Weight returns weight of the node.
|
|
func (x *NodeParam) Weight() float64 {
|
|
return x.weight
|
|
}
|
|
|
|
// WaitParams contains parameters used in polling is a something applied on FrostFS network.
|
|
type WaitParams struct {
|
|
Timeout time.Duration
|
|
PollInterval time.Duration
|
|
}
|
|
|
|
// SetTimeout specifies the time to wait for the operation to complete.
|
|
//
|
|
// Deprecated: Use WaitParams.Timeout instead.
|
|
func (x *WaitParams) SetTimeout(timeout time.Duration) {
|
|
x.Timeout = timeout
|
|
}
|
|
|
|
// SetPollInterval specifies the interval, once it will check the completion of the operation.
|
|
//
|
|
// Deprecated: Use WaitParams.PollInterval instead.
|
|
func (x *WaitParams) SetPollInterval(tick time.Duration) {
|
|
x.PollInterval = tick
|
|
}
|
|
|
|
func defaultWaitParams() *WaitParams {
|
|
return &WaitParams{
|
|
Timeout: 120 * time.Second,
|
|
PollInterval: 5 * time.Second,
|
|
}
|
|
}
|
|
|
|
// checkForPositive panics if any of the wait params isn't positive.
|
|
func (x *WaitParams) checkForPositive() {
|
|
if x.Timeout <= 0 || x.PollInterval <= 0 {
|
|
panic("all wait params must be positive")
|
|
}
|
|
}
|
|
|
|
// CheckForValid checks if all wait params are non-negative.
|
|
func (x *WaitParams) CheckValidity() error {
|
|
if x.Timeout <= 0 {
|
|
return errors.New("timeout cannot be negative")
|
|
}
|
|
if x.PollInterval <= 0 {
|
|
return errors.New("poll interval cannot be negative")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type prmContext struct {
|
|
defaultSession bool
|
|
verb session.ObjectVerb
|
|
cnr cid.ID
|
|
|
|
objSet bool
|
|
objs []oid.ID
|
|
}
|
|
|
|
func (x *prmContext) useDefaultSession() {
|
|
x.defaultSession = true
|
|
}
|
|
|
|
func (x *prmContext) useContainer(cnr cid.ID) {
|
|
x.cnr = cnr
|
|
}
|
|
|
|
func (x *prmContext) useObjects(ids []oid.ID) {
|
|
x.objs = ids
|
|
x.objSet = true
|
|
}
|
|
|
|
func (x *prmContext) useAddress(addr oid.Address) {
|
|
x.cnr = addr.Container()
|
|
x.objs = []oid.ID{addr.Object()}
|
|
x.objSet = true
|
|
}
|
|
|
|
func (x *prmContext) useVerb(verb session.ObjectVerb) {
|
|
x.verb = verb
|
|
}
|
|
|
|
type prmCommon struct {
|
|
key *ecdsa.PrivateKey
|
|
btoken *bearer.Token
|
|
stoken *session.Object
|
|
}
|
|
|
|
// UseKey specifies private key to sign the requests.
|
|
// If key is not provided, then Pool default key is used.
|
|
func (x *prmCommon) UseKey(key *ecdsa.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// UseBearer attaches bearer token to be used for the operation.
|
|
func (x *prmCommon) UseBearer(token bearer.Token) {
|
|
x.btoken = &token
|
|
}
|
|
|
|
// UseSession specifies session within which operation should be performed.
|
|
func (x *prmCommon) UseSession(token session.Object) {
|
|
x.stoken = &token
|
|
}
|
|
|
|
// PrmObjectPut groups parameters of PutObject operation.
|
|
type PrmObjectPut struct {
|
|
prmCommon
|
|
|
|
hdr object.Object
|
|
|
|
payload io.Reader
|
|
|
|
copiesNumber []uint32
|
|
|
|
clientCut bool
|
|
networkInfo netmap.NetworkInfo
|
|
|
|
withoutHomomorphicHash bool
|
|
|
|
bufferMaxSize uint64
|
|
}
|
|
|
|
// SetHeader specifies header of the object.
|
|
func (x *PrmObjectPut) SetHeader(hdr object.Object) {
|
|
x.hdr = hdr
|
|
}
|
|
|
|
// SetPayload specifies payload of the object.
|
|
func (x *PrmObjectPut) SetPayload(payload io.Reader) {
|
|
x.payload = payload
|
|
}
|
|
|
|
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
|
|
// Zero means using default behavior.
|
|
func (x *PrmObjectPut) SetCopiesNumber(copiesNumber uint32) {
|
|
x.copiesNumber = []uint32{copiesNumber}
|
|
}
|
|
|
|
// SetCopiesNumberVector sets number of object copies that is enough to consider put successful, provided as array.
|
|
// Nil/empty vector means using default behavior.
|
|
func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
|
|
x.copiesNumber = copiesNumber
|
|
}
|
|
|
|
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
|
|
// and retrying is possible. But this leads to additional memory using for buffering object parts.
|
|
// Buffer size for every put is MaxObjectSize value from FrostFS network.
|
|
// There is limit for total memory allocation for in-flight request and
|
|
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
|
|
// Put requests will fail if this limit be reached.
|
|
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
|
|
x.clientCut = clientCut
|
|
}
|
|
|
|
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
|
|
func (x *PrmObjectPut) WithoutHomomorphicHash(v bool) {
|
|
x.withoutHomomorphicHash = v
|
|
}
|
|
|
|
// SetBufferMaxSize sets max buffer size to read payload.
|
|
// This value isn't used if object size is set explicitly and less than this value.
|
|
// Default value 3MB.
|
|
func (x *PrmObjectPut) SetBufferMaxSize(size uint64) {
|
|
x.bufferMaxSize = size
|
|
}
|
|
|
|
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
|
x.networkInfo = ni
|
|
}
|
|
|
|
// PrmObjectDelete groups parameters of DeleteObject operation.
|
|
type PrmObjectDelete struct {
|
|
prmCommon
|
|
|
|
addr oid.Address
|
|
}
|
|
|
|
// SetAddress specifies FrostFS address of the object.
|
|
func (x *PrmObjectDelete) SetAddress(addr oid.Address) {
|
|
x.addr = addr
|
|
}
|
|
|
|
// PrmObjectGet groups parameters of GetObject operation.
|
|
type PrmObjectGet struct {
|
|
prmCommon
|
|
|
|
addr oid.Address
|
|
}
|
|
|
|
// SetAddress specifies FrostFS address of the object.
|
|
func (x *PrmObjectGet) SetAddress(addr oid.Address) {
|
|
x.addr = addr
|
|
}
|
|
|
|
// PrmObjectHead groups parameters of HeadObject operation.
|
|
type PrmObjectHead struct {
|
|
prmCommon
|
|
|
|
addr oid.Address
|
|
raw bool
|
|
}
|
|
|
|
// SetAddress specifies FrostFS address of the object.
|
|
func (x *PrmObjectHead) SetAddress(addr oid.Address) {
|
|
x.addr = addr
|
|
}
|
|
|
|
// MarkRaw marks an intent to read physically stored object.
|
|
func (x *PrmObjectHead) MarkRaw() {
|
|
x.raw = true
|
|
}
|
|
|
|
// PrmObjectRange groups parameters of RangeObject operation.
|
|
type PrmObjectRange struct {
|
|
prmCommon
|
|
|
|
addr oid.Address
|
|
off, ln uint64
|
|
}
|
|
|
|
// SetAddress specifies FrostFS address of the object.
|
|
func (x *PrmObjectRange) SetAddress(addr oid.Address) {
|
|
x.addr = addr
|
|
}
|
|
|
|
// SetOffset sets offset of the payload range to be read.
|
|
func (x *PrmObjectRange) SetOffset(offset uint64) {
|
|
x.off = offset
|
|
}
|
|
|
|
// SetLength sets length of the payload range to be read.
|
|
func (x *PrmObjectRange) SetLength(length uint64) {
|
|
x.ln = length
|
|
}
|
|
|
|
// PrmObjectSearch groups parameters of SearchObjects operation.
|
|
type PrmObjectSearch struct {
|
|
prmCommon
|
|
|
|
cnrID cid.ID
|
|
filters object.SearchFilters
|
|
}
|
|
|
|
// SetContainerID specifies the container in which to look for objects.
|
|
func (x *PrmObjectSearch) SetContainerID(cnrID cid.ID) {
|
|
x.cnrID = cnrID
|
|
}
|
|
|
|
// SetFilters specifies filters by which to select objects.
|
|
func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
|
|
x.filters = filters
|
|
}
|
|
|
|
// PrmContainerPut groups parameters of PutContainer operation.
|
|
type PrmContainerPut struct {
|
|
ClientParams sdkClient.PrmContainerPut
|
|
|
|
WaitParams *WaitParams
|
|
}
|
|
|
|
// SetContainer container structure to be used as a parameter of the base
|
|
// client's operation.
|
|
//
|
|
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.SetContainer.
|
|
//
|
|
// Deprecated: Use PrmContainerPut.ClientParams.Container instead.
|
|
func (x *PrmContainerPut) SetContainer(cnr container.Container) {
|
|
x.ClientParams.SetContainer(cnr)
|
|
}
|
|
|
|
// WithinSession specifies session to be used as a parameter of the base
|
|
// client's operation.
|
|
//
|
|
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerPut.WithinSession.
|
|
//
|
|
// Deprecated: Use PrmContainerPut.ClientParams.Session instead.
|
|
func (x *PrmContainerPut) WithinSession(s session.Container) {
|
|
x.ClientParams.WithinSession(s)
|
|
}
|
|
|
|
// SetWaitParams specifies timeout params to complete operation.
|
|
// If not provided the default one will be used.
|
|
// Panics if any of the wait params isn't positive.
|
|
//
|
|
// Deprecated: Use PrmContainerPut.ClientParams.WaitParams instead.
|
|
func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) {
|
|
waitParams.checkForPositive()
|
|
x.WaitParams = &waitParams
|
|
}
|
|
|
|
// PrmContainerGet groups parameters of GetContainer operation.
|
|
type PrmContainerGet struct {
|
|
ContainerID cid.ID
|
|
|
|
Session *session.Container
|
|
}
|
|
|
|
// SetContainerID specifies identifier of the container to be read.
|
|
//
|
|
// Deprecated: Use PrmContainerGet.ContainerID instead.
|
|
func (prm *PrmContainerGet) SetContainerID(cnrID cid.ID) {
|
|
prm.ContainerID = cnrID
|
|
}
|
|
|
|
// PrmContainerList groups parameters of ListContainers operation.
|
|
type PrmContainerList struct {
|
|
OwnerID user.ID
|
|
|
|
Session *session.Container
|
|
}
|
|
|
|
// SetOwnerID specifies identifier of the FrostFS account to list the containers.
|
|
//
|
|
// Deprecated: Use PrmContainerList.OwnerID instead.
|
|
func (x *PrmContainerList) SetOwnerID(ownerID user.ID) {
|
|
x.OwnerID = ownerID
|
|
}
|
|
|
|
// PrmContainerDelete groups parameters of DeleteContainer operation.
|
|
type PrmContainerDelete struct {
|
|
ContainerID cid.ID
|
|
|
|
Session *session.Container
|
|
|
|
WaitParams *WaitParams
|
|
}
|
|
|
|
// SetContainerID specifies identifier of the FrostFS container to be removed.
|
|
//
|
|
// Deprecated: Use PrmContainerDelete.ContainerID instead.
|
|
func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) {
|
|
x.ContainerID = cnrID
|
|
}
|
|
|
|
// SetSessionToken specifies session within which operation should be performed.
|
|
//
|
|
// Deprecated: Use PrmContainerDelete.Session instead.
|
|
func (x *PrmContainerDelete) SetSessionToken(token session.Container) {
|
|
x.Session = &token
|
|
}
|
|
|
|
// SetWaitParams specifies timeout params to complete operation.
|
|
// If not provided the default one will be used.
|
|
// Panics if any of the wait params isn't positive.
|
|
//
|
|
// Deprecated: Use PrmContainerDelete.WaitParams instead.
|
|
func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) {
|
|
waitParams.checkForPositive()
|
|
x.WaitParams = &waitParams
|
|
}
|
|
|
|
// PrmContainerEACL groups parameters of GetEACL operation.
|
|
type PrmContainerEACL struct {
|
|
ContainerID cid.ID
|
|
|
|
Session *session.Container
|
|
}
|
|
|
|
// SetContainerID specifies identifier of the FrostFS container to read the eACL table.
|
|
//
|
|
// Deprecated: Use PrmContainerEACL.ContainerID instead.
|
|
func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
|
|
x.ContainerID = cnrID
|
|
}
|
|
|
|
// PrmContainerSetEACL groups parameters of SetEACL operation.
|
|
type PrmContainerSetEACL struct {
|
|
Table eacl.Table
|
|
|
|
Session *session.Container
|
|
|
|
WaitParams *WaitParams
|
|
}
|
|
|
|
// SetTable sets structure of container's extended ACL to be used as a
|
|
// parameter of the base client's operation.
|
|
//
|
|
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.SetTable.
|
|
//
|
|
// Deprecated: Use PrmContainerSetEACL.Table instead.
|
|
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
|
|
x.Table = table
|
|
}
|
|
|
|
// WithinSession specifies session to be used as a parameter of the base
|
|
// client's operation.
|
|
//
|
|
// See git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client.PrmContainerSetEACL.WithinSession.
|
|
//
|
|
// Deprecated: Use PrmContainerSetEACL.Session instead.
|
|
func (x *PrmContainerSetEACL) WithinSession(s session.Container) {
|
|
x.Session = &s
|
|
}
|
|
|
|
// SetWaitParams specifies timeout params to complete operation.
|
|
// If not provided the default one will be used.
|
|
// Panics if any of the wait params isn't positive.
|
|
//
|
|
// Deprecated: Use PrmContainerSetEACL.WaitParams instead.
|
|
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
|
|
waitParams.checkForPositive()
|
|
x.WaitParams = &waitParams
|
|
}
|
|
|
|
// PrmBalanceGet groups parameters of Balance operation.
|
|
type PrmBalanceGet struct {
|
|
account user.ID
|
|
}
|
|
|
|
// SetAccount specifies identifier of the FrostFS account for which the balance is requested.
|
|
func (x *PrmBalanceGet) SetAccount(id user.ID) {
|
|
x.account = id
|
|
}
|
|
|
|
// prmEndpointInfo groups parameters of sessionCreate operation.
|
|
type prmCreateSession struct {
|
|
exp uint64
|
|
key ecdsa.PrivateKey
|
|
}
|
|
|
|
// setExp sets number of the last FrostFS epoch in the lifetime of the session after which it will be expired.
|
|
func (x *prmCreateSession) setExp(exp uint64) {
|
|
x.exp = exp
|
|
}
|
|
|
|
// useKey specifies owner private key for session token.
|
|
// If key is not provided, then Pool default key is used.
|
|
func (x *prmCreateSession) useKey(key ecdsa.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// prmEndpointInfo groups parameters of endpointInfo operation.
|
|
type prmEndpointInfo struct{}
|
|
|
|
// prmNetworkInfo groups parameters of networkInfo operation.
|
|
type prmNetworkInfo struct{}
|
|
|
|
// prmNetMapSnapshot groups parameters of netMapSnapshot operation.
|
|
type prmNetMapSnapshot struct{}
|
|
|
|
// resCreateSession groups resulting values of sessionCreate operation.
|
|
type resCreateSession struct {
|
|
id []byte
|
|
|
|
sessionKey []byte
|
|
}
|
|
|
|
// Pool represents virtual connection to the FrostFS network to communicate
|
|
// with multiple FrostFS servers without thinking about switching between servers
|
|
// due to load balancing proportions or their unavailability.
|
|
// It is designed to provide a convenient abstraction from the multiple sdkClient.client types.
|
|
//
|
|
// Pool can be created and initialized using NewPool function.
|
|
// Before executing the FrostFS operations using the Pool, connection to the
|
|
// servers MUST BE correctly established (see Dial method).
|
|
// Using the Pool before connecting have been established can lead to a panic.
|
|
// After the work, the Pool SHOULD BE closed (see Close method): it frees internal
|
|
// and system resources which were allocated for the period of work of the Pool.
|
|
// Calling Dial/Close methods during the communication process step strongly discouraged
|
|
// as it leads to undefined behavior.
|
|
//
|
|
// Each method which produces a FrostFS API call may return an error.
|
|
// Status of underlying server response is casted to built-in error instance.
|
|
// Certain statuses can be checked using `sdkClient` and standard `errors` packages.
|
|
// Note that package provides some helper functions to work with status returns
|
|
// (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound).
|
|
//
|
|
// See pool package overview to get some examples.
|
|
type Pool struct {
|
|
innerPools []*innerPool
|
|
key *ecdsa.PrivateKey
|
|
cancel context.CancelFunc
|
|
closedCh chan struct{}
|
|
cache *sessionCache
|
|
stokenDuration uint64
|
|
rebalanceParams rebalanceParameters
|
|
clientBuilder clientBuilder
|
|
logger *zap.Logger
|
|
|
|
maxObjectSize uint64
|
|
}
|
|
|
|
type innerPool struct {
|
|
lock sync.RWMutex
|
|
sampler *sampler
|
|
clients []client
|
|
}
|
|
|
|
const (
|
|
defaultSessionTokenExpirationDuration = 100 // in blocks
|
|
defaultErrorThreshold = 100
|
|
|
|
defaultRebalanceInterval = 15 * time.Second
|
|
defaultHealthcheckTimeout = 4 * time.Second
|
|
defaultDialTimeout = 5 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
|
|
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
|
)
|
|
|
|
// NewPool creates connection pool using parameters.
|
|
func NewPool(options InitParameters) (*Pool, error) {
|
|
if options.key == nil {
|
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
|
}
|
|
|
|
nodesParams, err := adjustNodeParams(options.nodeParams)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cache, err := newCache()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
|
}
|
|
|
|
fillDefaultInitParams(&options, cache)
|
|
|
|
pool := &Pool{
|
|
key: options.key,
|
|
cache: cache,
|
|
logger: options.logger,
|
|
stokenDuration: options.sessionExpirationDuration,
|
|
rebalanceParams: rebalanceParameters{
|
|
nodesParams: nodesParams,
|
|
nodeRequestTimeout: options.healthcheckTimeout,
|
|
clientRebalanceInterval: options.clientRebalanceInterval,
|
|
sessionExpirationDuration: options.sessionExpirationDuration,
|
|
},
|
|
clientBuilder: options.clientBuilder,
|
|
}
|
|
|
|
return pool, nil
|
|
}
|
|
|
|
// Dial establishes a connection to the servers from the FrostFS network.
|
|
// It also starts a routine that checks the health of the nodes and
|
|
// updates the weights of the nodes for balancing.
|
|
// Returns an error describing failure reason.
|
|
//
|
|
// If failed, the Pool SHOULD NOT be used.
|
|
//
|
|
// See also InitParameters.SetClientRebalanceInterval.
|
|
func (p *Pool) Dial(ctx context.Context) error {
|
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
|
var atLeastOneHealthy bool
|
|
|
|
for i, params := range p.rebalanceParams.nodesParams {
|
|
clients := make([]client, len(params.weights))
|
|
for j, addr := range params.addresses {
|
|
clients[j] = p.clientBuilder(addr)
|
|
if err := clients[j].dial(ctx); err != nil {
|
|
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
var st session.Object
|
|
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
|
|
if err != nil {
|
|
clients[j].setUnhealthy()
|
|
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
|
zap.String("address", addr), zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
|
|
atLeastOneHealthy = true
|
|
}
|
|
source := rand.NewSource(time.Now().UnixNano())
|
|
sampl := newSampler(params.weights, source)
|
|
|
|
inner[i] = &innerPool{
|
|
sampler: sampl,
|
|
clients: clients,
|
|
}
|
|
}
|
|
|
|
if !atLeastOneHealthy {
|
|
return fmt.Errorf("at least one node must be healthy")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
p.cancel = cancel
|
|
p.closedCh = make(chan struct{})
|
|
p.innerPools = inner
|
|
|
|
ni, err := p.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("get network info for max object size: %w", err)
|
|
}
|
|
p.maxObjectSize = ni.MaxObjectSize()
|
|
|
|
go p.startRebalance(ctx)
|
|
return nil
|
|
}
|
|
|
|
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
|
if p.logger == nil {
|
|
return
|
|
}
|
|
|
|
p.logger.Log(level, msg, fields...)
|
|
}
|
|
|
|
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|
if params.sessionExpirationDuration == 0 {
|
|
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
|
|
}
|
|
|
|
if params.errorThreshold == 0 {
|
|
params.errorThreshold = defaultErrorThreshold
|
|
}
|
|
|
|
if params.clientRebalanceInterval <= 0 {
|
|
params.clientRebalanceInterval = defaultRebalanceInterval
|
|
}
|
|
|
|
if params.healthcheckTimeout <= 0 {
|
|
params.healthcheckTimeout = defaultHealthcheckTimeout
|
|
}
|
|
|
|
if params.nodeDialTimeout <= 0 {
|
|
params.nodeDialTimeout = defaultDialTimeout
|
|
}
|
|
|
|
if params.nodeStreamTimeout <= 0 {
|
|
params.nodeStreamTimeout = defaultStreamTimeout
|
|
}
|
|
|
|
if params.isMissingClientBuilder() {
|
|
params.setClientBuilder(func(addr string) client {
|
|
var prm wrapperPrm
|
|
prm.setAddress(addr)
|
|
prm.setKey(*params.key)
|
|
prm.setLogger(params.logger)
|
|
prm.setDialTimeout(params.nodeDialTimeout)
|
|
prm.setStreamTimeout(params.nodeStreamTimeout)
|
|
prm.setErrorThreshold(params.errorThreshold)
|
|
prm.setPoolRequestCallback(params.requestCallback)
|
|
prm.setGRPCDialOptions(params.dialOptions)
|
|
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
|
cache.updateEpoch(info.Epoch())
|
|
return nil
|
|
})
|
|
return newWrapper(prm)
|
|
})
|
|
}
|
|
}
|
|
|
|
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
|
if len(nodeParams) == 0 {
|
|
return nil, errors.New("no FrostFS peers configured")
|
|
}
|
|
|
|
nodesParamsMap := make(map[int]*nodesParam)
|
|
for _, param := range nodeParams {
|
|
nodes, ok := nodesParamsMap[param.priority]
|
|
if !ok {
|
|
nodes = &nodesParam{priority: param.priority}
|
|
}
|
|
nodes.addresses = append(nodes.addresses, param.address)
|
|
nodes.weights = append(nodes.weights, param.weight)
|
|
nodesParamsMap[param.priority] = nodes
|
|
}
|
|
|
|
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
|
|
for _, nodes := range nodesParamsMap {
|
|
nodes.weights = adjustWeights(nodes.weights)
|
|
nodesParams = append(nodesParams, nodes)
|
|
}
|
|
|
|
sort.Slice(nodesParams, func(i, j int) bool {
|
|
return nodesParams[i].priority < nodesParams[j].priority
|
|
})
|
|
|
|
return nodesParams, nil
|
|
}
|
|
|
|
// startRebalance runs loop to monitor connection healthy status.
|
|
func (p *Pool) startRebalance(ctx context.Context) {
|
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
|
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
|
for i, params := range p.rebalanceParams.nodesParams {
|
|
buffers[i] = make([]float64, len(params.weights))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(p.closedCh)
|
|
return
|
|
case <-ticker.C:
|
|
p.updateNodesHealth(ctx, buffers)
|
|
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
|
wg := sync.WaitGroup{}
|
|
for i, inner := range p.innerPools {
|
|
wg.Add(1)
|
|
|
|
bufferWeights := buffers[i]
|
|
go func(i int, _ *innerPool) {
|
|
defer wg.Done()
|
|
p.updateInnerNodesHealth(ctx, i, bufferWeights)
|
|
}(i, inner)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
|
if i > len(p.innerPools)-1 {
|
|
return
|
|
}
|
|
pool := p.innerPools[i]
|
|
options := p.rebalanceParams
|
|
|
|
healthyChanged := new(atomic.Bool)
|
|
wg := sync.WaitGroup{}
|
|
|
|
for j, cli := range pool.clients {
|
|
wg.Add(1)
|
|
go func(j int, cli client) {
|
|
defer wg.Done()
|
|
|
|
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
|
defer c()
|
|
|
|
healthy, changed := cli.restartIfUnhealthy(tctx)
|
|
if healthy {
|
|
bufferWeights[j] = options.nodesParams[i].weights[j]
|
|
} else {
|
|
bufferWeights[j] = 0
|
|
p.cache.DeleteByPrefix(cli.address())
|
|
}
|
|
|
|
if changed {
|
|
p.log(zap.DebugLevel, "health has changed",
|
|
zap.String("address", cli.address()), zap.Bool("healthy", healthy))
|
|
healthyChanged.Store(true)
|
|
}
|
|
}(j, cli)
|
|
}
|
|
wg.Wait()
|
|
|
|
if healthyChanged.Load() {
|
|
probabilities := adjustWeights(bufferWeights)
|
|
source := rand.NewSource(time.Now().UnixNano())
|
|
pool.lock.Lock()
|
|
pool.sampler = newSampler(probabilities, source)
|
|
pool.lock.Unlock()
|
|
}
|
|
}
|
|
|
|
func adjustWeights(weights []float64) []float64 {
|
|
adjusted := make([]float64, len(weights))
|
|
sum := 0.0
|
|
for _, weight := range weights {
|
|
sum += weight
|
|
}
|
|
if sum > 0 {
|
|
for i, weight := range weights {
|
|
adjusted[i] = weight / sum
|
|
}
|
|
}
|
|
|
|
return adjusted
|
|
}
|
|
|
|
func (p *Pool) connection() (client, error) {
|
|
for _, inner := range p.innerPools {
|
|
cp, err := inner.connection()
|
|
if err == nil {
|
|
return cp, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
|
|
func (p *innerPool) connection() (client, error) {
|
|
p.lock.RLock() // need lock because of using p.sampler
|
|
defer p.lock.RUnlock()
|
|
if len(p.clients) == 1 {
|
|
cp := p.clients[0]
|
|
if cp.isHealthy() {
|
|
return cp, nil
|
|
}
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
attempts := 3 * len(p.clients)
|
|
for k := 0; k < attempts; k++ {
|
|
i := p.sampler.Next()
|
|
if cp := p.clients[i]; cp.isHealthy() {
|
|
return cp, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
|
|
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
|
|
k := keys.PrivateKey{PrivateKey: *key}
|
|
|
|
stype := "server"
|
|
if clientCut {
|
|
stype = "client"
|
|
}
|
|
|
|
return address + stype + k.String()
|
|
}
|
|
|
|
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
if sdkClient.IsErrSessionNotFound(err) || sdkClient.IsErrSessionExpired(err) {
|
|
p.cache.DeleteByPrefix(address)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
|
|
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
epoch := ni.CurrentEpoch()
|
|
|
|
var exp uint64
|
|
if math.MaxUint64-epoch < dur {
|
|
exp = math.MaxUint64
|
|
} else {
|
|
exp = epoch + dur
|
|
}
|
|
var prm prmCreateSession
|
|
prm.setExp(exp)
|
|
prm.useKey(ownerKey)
|
|
|
|
var (
|
|
id uuid.UUID
|
|
key frostfsecdsa.PublicKey
|
|
)
|
|
|
|
if clientCut {
|
|
id = uuid.New()
|
|
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
|
|
} else {
|
|
res, err := c.sessionCreate(ctx, prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = id.UnmarshalBinary(res.id); err != nil {
|
|
return fmt.Errorf("invalid session token ID: %w", err)
|
|
}
|
|
if err = key.Decode(res.sessionKey); err != nil {
|
|
return fmt.Errorf("invalid public session key: %w", err)
|
|
}
|
|
}
|
|
|
|
dst.SetID(id)
|
|
dst.SetAuthKey(&key)
|
|
dst.SetExp(exp)
|
|
|
|
return nil
|
|
}
|
|
|
|
type callContext struct {
|
|
client client
|
|
|
|
// client endpoint
|
|
endpoint string
|
|
|
|
// request signer
|
|
key *ecdsa.PrivateKey
|
|
|
|
// flag to open default session if session token is missing
|
|
sessionDefault bool
|
|
sessionTarget func(session.Object)
|
|
sessionVerb session.ObjectVerb
|
|
sessionCnr cid.ID
|
|
sessionObjSet bool
|
|
sessionObjs []oid.ID
|
|
|
|
sessionClientCut bool
|
|
}
|
|
|
|
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx.key = cfg.key
|
|
if ctx.key == nil {
|
|
// use pool key if caller didn't specify its own
|
|
ctx.key = p.key
|
|
}
|
|
|
|
ctx.endpoint = cp.address()
|
|
ctx.client = cp
|
|
|
|
if ctx.sessionTarget != nil && cfg.stoken != nil {
|
|
ctx.sessionTarget(*cfg.stoken)
|
|
}
|
|
|
|
// note that we don't override session provided by the caller
|
|
ctx.sessionDefault = cfg.stoken == nil && prmCtx.defaultSession
|
|
if ctx.sessionDefault {
|
|
ctx.sessionVerb = prmCtx.verb
|
|
ctx.sessionCnr = prmCtx.cnr
|
|
ctx.sessionObjSet = prmCtx.objSet
|
|
ctx.sessionObjs = prmCtx.objs
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// opens new session or uses cached one.
|
|
// Must be called only on initialized callContext with set sessionTarget.
|
|
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
|
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
|
|
|
tok, ok := p.cache.Get(cacheKey)
|
|
if !ok {
|
|
// init new session
|
|
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
|
|
if err != nil {
|
|
return fmt.Errorf("session API client: %w", err)
|
|
}
|
|
|
|
// cache the opened session
|
|
p.cache.Put(cacheKey, tok)
|
|
}
|
|
|
|
tok.ForVerb(cc.sessionVerb)
|
|
tok.BindContainer(cc.sessionCnr)
|
|
|
|
if cc.sessionObjSet {
|
|
tok.LimitByObjects(cc.sessionObjs...)
|
|
}
|
|
|
|
// sign the token
|
|
if err := tok.Sign(*cc.key); err != nil {
|
|
return fmt.Errorf("sign token of the opened session: %w", err)
|
|
}
|
|
|
|
cc.sessionTarget(tok)
|
|
|
|
return nil
|
|
}
|
|
|
|
// opens default session (if sessionDefault is set), and calls f. If f returns
|
|
// session-related error then cached token is removed.
|
|
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
|
|
var err error
|
|
|
|
if cc.sessionDefault {
|
|
err = p.openDefaultSession(ctx, cc)
|
|
if err != nil {
|
|
return fmt.Errorf("open default session: %w", err)
|
|
}
|
|
}
|
|
|
|
err = f()
|
|
_ = p.checkSessionTokenErr(err, cc.endpoint)
|
|
|
|
return err
|
|
}
|
|
|
|
// fillAppropriateKey use pool key if caller didn't specify its own.
|
|
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
|
|
if prm.key == nil {
|
|
prm.key = p.key
|
|
}
|
|
}
|
|
|
|
// PutObject writes an object through a remote server using FrostFS API protocol.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
cnr, _ := prm.hdr.ContainerID()
|
|
|
|
var prmCtx prmContext
|
|
prmCtx.useDefaultSession()
|
|
prmCtx.useVerb(session.VerbObjectPut)
|
|
prmCtx.useContainer(cnr)
|
|
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var ctxCall callContext
|
|
ctxCall.sessionClientCut = prm.clientCut
|
|
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
|
return oid.ID{}, fmt.Errorf("init call context: %w", err)
|
|
}
|
|
|
|
if ctxCall.sessionDefault {
|
|
ctxCall.sessionTarget = prm.UseSession
|
|
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
|
|
return oid.ID{}, fmt.Errorf("open default session: %w", err)
|
|
}
|
|
}
|
|
|
|
if prm.clientCut {
|
|
var ni netmap.NetworkInfo
|
|
ni.SetCurrentEpoch(p.cache.Epoch())
|
|
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
|
|
prm.setNetworkInfo(ni)
|
|
}
|
|
|
|
id, err := ctxCall.client.objectPut(ctx, prm)
|
|
if err != nil {
|
|
// removes session token from cache in case of token error
|
|
p.checkSessionTokenErr(err, ctxCall.endpoint)
|
|
return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
|
// As a marker, a special unit called a tombstone is placed in the container.
|
|
// It confirms the user's intent to delete the object, and is itself a container object.
|
|
// Explicit deletion is done asynchronously, and is generally not guaranteed.
|
|
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
|
var prmCtx prmContext
|
|
prmCtx.useDefaultSession()
|
|
prmCtx.useVerb(session.VerbObjectDelete)
|
|
prmCtx.useAddress(prm.addr)
|
|
|
|
if prm.stoken == nil { // collect phy objects only if we are about to open default session
|
|
var tokens relations.Tokens
|
|
tokens.Bearer = prm.btoken
|
|
|
|
relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to collect relatives: %w", err)
|
|
}
|
|
|
|
if len(relatives) != 0 {
|
|
prmCtx.useContainer(prm.addr.Container())
|
|
prmCtx.useObjects(append(relatives, prm.addr.Object()))
|
|
}
|
|
}
|
|
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var cc callContext
|
|
cc.sessionTarget = prm.UseSession
|
|
|
|
err := p.initCallContext(&cc, prm.prmCommon, prmCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.call(ctx, &cc, func() error {
|
|
if err = cc.client.objectDelete(ctx, prm); err != nil {
|
|
return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
type objectReadCloser struct {
|
|
reader *sdkClient.ObjectReader
|
|
elapsedTimeCallback func(time.Duration)
|
|
}
|
|
|
|
// Read implements io.Reader of the object payload.
|
|
func (x *objectReadCloser) Read(p []byte) (int, error) {
|
|
start := time.Now()
|
|
n, err := x.reader.Read(p)
|
|
x.elapsedTimeCallback(time.Since(start))
|
|
return n, err
|
|
}
|
|
|
|
// Close implements io.Closer of the object payload.
|
|
func (x *objectReadCloser) Close() error {
|
|
_, err := x.reader.Close()
|
|
return err
|
|
}
|
|
|
|
// ResGetObject is designed to provide object header nad read one object payload from FrostFS system.
|
|
type ResGetObject struct {
|
|
Header object.Object
|
|
|
|
Payload io.ReadCloser
|
|
}
|
|
|
|
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var cc callContext
|
|
cc.sessionTarget = prm.UseSession
|
|
|
|
var res ResGetObject
|
|
|
|
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
return res, p.call(ctx, &cc, func() error {
|
|
res, err = cc.client.objectGet(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("get object via client %s: %w", cc.endpoint, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// HeadObject reads object header through a remote server using FrostFS API protocol.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var cc callContext
|
|
cc.sessionTarget = prm.UseSession
|
|
|
|
var obj object.Object
|
|
|
|
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
|
if err != nil {
|
|
return obj, err
|
|
}
|
|
|
|
return obj, p.call(ctx, &cc, func() error {
|
|
obj, err = cc.client.objectHead(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("head object via client %s: %w", cc.endpoint, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ResObjectRange is designed to read payload range of one object
|
|
// from FrostFS system.
|
|
//
|
|
// Must be initialized using Pool.ObjectRange, any other
|
|
// usage is unsafe.
|
|
type ResObjectRange struct {
|
|
payload *sdkClient.ObjectRangeReader
|
|
elapsedTimeCallback func(time.Duration)
|
|
}
|
|
|
|
// Read implements io.Reader of the object payload.
|
|
func (x *ResObjectRange) Read(p []byte) (int, error) {
|
|
start := time.Now()
|
|
n, err := x.payload.Read(p)
|
|
x.elapsedTimeCallback(time.Since(start))
|
|
return n, err
|
|
}
|
|
|
|
// Close ends reading the payload range and returns the result of the operation
|
|
// along with the final results. Must be called after using the ResObjectRange.
|
|
func (x *ResObjectRange) Close() error {
|
|
_, err := x.payload.Close()
|
|
return err
|
|
}
|
|
|
|
// ObjectRange initiates reading an object's payload range through a remote
|
|
// server using FrostFS API protocol.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var cc callContext
|
|
cc.sessionTarget = prm.UseSession
|
|
|
|
var res ResObjectRange
|
|
|
|
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
return res, p.call(ctx, &cc, func() error {
|
|
res, err = cc.client.objectRange(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("object range via client %s: %w", cc.endpoint, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ResObjectSearch is designed to read list of object identifiers from FrostFS system.
|
|
//
|
|
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
|
|
type ResObjectSearch struct {
|
|
r *sdkClient.ObjectListReader
|
|
}
|
|
|
|
// Read reads another list of the object identifiers.
|
|
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
|
|
n, ok := x.r.Read(buf)
|
|
if !ok {
|
|
_, err := x.r.Close()
|
|
if err == nil {
|
|
return n, io.EOF
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// Iterate iterates over the list of found object identifiers.
|
|
// f can return true to stop iteration earlier.
|
|
//
|
|
// Returns an error if object can't be read.
|
|
func (x *ResObjectSearch) Iterate(f func(oid.ID) bool) error {
|
|
return x.r.Iterate(f)
|
|
}
|
|
|
|
// Close ends reading list of the matched objects and returns the result of the operation
|
|
// along with the final results. Must be called after using the ResObjectSearch.
|
|
func (x *ResObjectSearch) Close() {
|
|
_, _ = x.r.Close()
|
|
}
|
|
|
|
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
|
|
//
|
|
// The call only opens the transmission channel, explicit fetching of matched objects
|
|
// is done using the ResObjectSearch. Resulting reader must be finally closed.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
|
p.fillAppropriateKey(&prm.prmCommon)
|
|
|
|
var cc callContext
|
|
cc.sessionTarget = prm.UseSession
|
|
|
|
var res ResObjectSearch
|
|
|
|
err := p.initCallContext(&cc, prm.prmCommon, prmContext{})
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
return res, p.call(ctx, &cc, func() error {
|
|
res, err = cc.client.objectSearch(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("search object via client %s: %w", cc.endpoint, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
|
|
//
|
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
//
|
|
// polling interval: 5s
|
|
// waiting timeout: 120s
|
|
//
|
|
// Success can be verified by reading by identifier (see GetContainer).
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return cid.ID{}, err
|
|
}
|
|
|
|
cnrID, err := cp.containerPut(ctx, prm)
|
|
if err != nil {
|
|
return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return cnrID, nil
|
|
}
|
|
|
|
// GetContainer reads FrostFS container by ID.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return container.Container{}, err
|
|
}
|
|
|
|
cnrs, err := cp.containerGet(ctx, prm)
|
|
if err != nil {
|
|
return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return cnrs, nil
|
|
}
|
|
|
|
// ListContainers requests identifiers of the account-owned containers.
|
|
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cnrIDs, err := cp.containerList(ctx, prm)
|
|
if err != nil {
|
|
return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return cnrIDs, nil
|
|
}
|
|
|
|
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
|
//
|
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
//
|
|
// polling interval: 5s
|
|
// waiting timeout: 120s
|
|
//
|
|
// Success can be verified by reading by identifier (see GetContainer).
|
|
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = cp.containerDelete(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("delete container via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetEACL reads eACL table of the FrostFS container.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return eacl.Table{}, err
|
|
}
|
|
|
|
eaclResult, err := cp.containerEACL(ctx, prm)
|
|
if err != nil {
|
|
return eacl.Table{}, fmt.Errorf("get EACL via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return eaclResult, nil
|
|
}
|
|
|
|
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
|
//
|
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
//
|
|
// polling interval: 5s
|
|
// waiting timeout: 120s
|
|
//
|
|
// Success can be verified by reading by identifier (see GetEACL).
|
|
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = cp.containerSetEACL(ctx, prm)
|
|
if err != nil {
|
|
return fmt.Errorf("set EACL via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Balance requests current balance of the FrostFS account.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return accounting.Decimal{}, err
|
|
}
|
|
|
|
balance, err := cp.balanceGet(ctx, prm)
|
|
if err != nil {
|
|
return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return balance, nil
|
|
}
|
|
|
|
// Statistic returns connection statistics.
|
|
func (p Pool) Statistic() Statistic {
|
|
stat := Statistic{}
|
|
for _, inner := range p.innerPools {
|
|
nodes := make([]string, 0, len(inner.clients))
|
|
inner.lock.RLock()
|
|
for _, cl := range inner.clients {
|
|
if cl.isHealthy() {
|
|
nodes = append(nodes, cl.address())
|
|
}
|
|
node := NodeStatistic{
|
|
address: cl.address(),
|
|
methods: cl.methodsStatus(),
|
|
overallErrors: cl.overallErrorRate(),
|
|
currentErrors: cl.currentErrorRate(),
|
|
}
|
|
stat.nodes = append(stat.nodes, node)
|
|
stat.overallErrors += node.overallErrors
|
|
}
|
|
inner.lock.RUnlock()
|
|
if len(stat.currentNodes) == 0 {
|
|
stat.currentNodes = nodes
|
|
}
|
|
}
|
|
|
|
return stat
|
|
}
|
|
|
|
// waitForContainerPresence waits until the container is found on the FrostFS network.
|
|
func waitForContainerPresence(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
|
|
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
|
|
_, err := cli.containerGet(ctx, prm)
|
|
return err == nil
|
|
})
|
|
}
|
|
|
|
// waitForEACLPresence waits until the container eacl is applied on the FrostFS network.
|
|
func waitForEACLPresence(ctx context.Context, cli client, prm PrmContainerEACL, table *eacl.Table, waitParams *WaitParams) error {
|
|
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
|
|
eaclTable, err := cli.containerEACL(ctx, prm)
|
|
if err == nil {
|
|
return eacl.EqualTables(*table, eaclTable)
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
|
|
// waitForContainerRemoved waits until the container is removed from the FrostFS network.
|
|
func waitForContainerRemoved(ctx context.Context, cli client, prm PrmContainerGet, waitParams *WaitParams) error {
|
|
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
|
|
_, err := cli.containerGet(ctx, prm)
|
|
return sdkClient.IsErrContainerNotFound(err)
|
|
})
|
|
}
|
|
|
|
// waitFor await that given condition will be met in waitParams time.
|
|
func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error {
|
|
wctx, cancel := context.WithTimeout(ctx, params.Timeout)
|
|
defer cancel()
|
|
ticker := time.NewTimer(params.PollInterval)
|
|
defer ticker.Stop()
|
|
wdone := wctx.Done()
|
|
done := ctx.Done()
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
case <-wdone:
|
|
return wctx.Err()
|
|
case <-ticker.C:
|
|
if condition(ctx) {
|
|
return nil
|
|
}
|
|
ticker.Reset(params.PollInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return netmap.NetworkInfo{}, err
|
|
}
|
|
|
|
netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{})
|
|
if err != nil {
|
|
return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return netInfo, nil
|
|
}
|
|
|
|
// NetMapSnapshot requests information about the FrostFS network map.
|
|
//
|
|
// Main return value MUST NOT be processed on an erroneous return.
|
|
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return netmap.NetMap{}, err
|
|
}
|
|
|
|
netMap, err := cp.netMapSnapshot(ctx, prmNetMapSnapshot{})
|
|
if err != nil {
|
|
return netmap.NetMap{}, fmt.Errorf("get network map via client '%s': %w", cp.address(), err)
|
|
}
|
|
|
|
return netMap, nil
|
|
}
|
|
|
|
// Close closes the Pool and releases all the associated resources.
|
|
func (p *Pool) Close() {
|
|
p.cancel()
|
|
<-p.closedCh
|
|
|
|
// close all clients
|
|
for _, pools := range p.innerPools {
|
|
for _, cli := range pools.clients {
|
|
if cli.isDialed() {
|
|
_ = cli.close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SyncContainerWithNetwork applies network configuration received via
|
|
// the Pool to the container. Changes the container if it does not satisfy
|
|
// network configuration.
|
|
//
|
|
// Pool and container MUST not be nil.
|
|
//
|
|
// Returns any error that does not allow reading configuration
|
|
// from the network.
|
|
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
|
|
ni, err := p.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("network info: %w", err)
|
|
}
|
|
|
|
container.ApplyNetworkConfig(cnr, ni)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetSplitInfo implements relations.Relations.
|
|
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnrID)
|
|
addr.SetObject(objID)
|
|
|
|
var prm PrmObjectHead
|
|
prm.SetAddress(addr)
|
|
if tokens.Bearer != nil {
|
|
prm.UseBearer(*tokens.Bearer)
|
|
}
|
|
if tokens.Session != nil {
|
|
prm.UseSession(*tokens.Session)
|
|
}
|
|
prm.MarkRaw()
|
|
|
|
_, err := p.HeadObject(ctx, prm)
|
|
|
|
var errSplit *object.SplitInfoError
|
|
|
|
switch {
|
|
case errors.As(err, &errSplit):
|
|
return errSplit.SplitInfo(), nil
|
|
case err == nil:
|
|
return nil, relations.ErrNoSplitInfo
|
|
default:
|
|
return nil, fmt.Errorf("failed to get raw object header: %w", err)
|
|
}
|
|
}
|
|
|
|
// ListChildrenByLinker implements relations.Relations.
|
|
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnrID)
|
|
addr.SetObject(objID)
|
|
|
|
var prm PrmObjectHead
|
|
prm.SetAddress(addr)
|
|
if tokens.Bearer != nil {
|
|
prm.UseBearer(*tokens.Bearer)
|
|
}
|
|
if tokens.Session != nil {
|
|
prm.UseSession(*tokens.Session)
|
|
}
|
|
|
|
res, err := p.HeadObject(ctx, prm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
|
|
}
|
|
|
|
return res.Children(), nil
|
|
}
|
|
|
|
// GetLeftSibling implements relations.Relations.
|
|
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnrID)
|
|
addr.SetObject(objID)
|
|
|
|
var prm PrmObjectHead
|
|
prm.SetAddress(addr)
|
|
if tokens.Bearer != nil {
|
|
prm.UseBearer(*tokens.Bearer)
|
|
}
|
|
if tokens.Session != nil {
|
|
prm.UseSession(*tokens.Session)
|
|
}
|
|
|
|
res, err := p.HeadObject(ctx, prm)
|
|
if err != nil {
|
|
return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err)
|
|
}
|
|
|
|
idMember, ok := res.PreviousID()
|
|
if !ok {
|
|
return oid.ID{}, relations.ErrNoLeftSibling
|
|
}
|
|
return idMember, nil
|
|
}
|
|
|
|
// FindSiblingBySplitID implements relations.Relations.
|
|
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
var query object.SearchFilters
|
|
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
|
|
|
var prm PrmObjectSearch
|
|
prm.SetContainerID(cnrID)
|
|
prm.SetFilters(query)
|
|
if tokens.Bearer != nil {
|
|
prm.UseBearer(*tokens.Bearer)
|
|
}
|
|
if tokens.Session != nil {
|
|
prm.UseSession(*tokens.Session)
|
|
}
|
|
|
|
res, err := p.SearchObjects(ctx, prm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
|
|
}
|
|
|
|
var members []oid.ID
|
|
err = res.Iterate(func(id oid.ID) bool {
|
|
members = append(members, id)
|
|
return false
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
|
}
|
|
|
|
return members, nil
|
|
}
|
|
|
|
// FindSiblingByParentID implements relations.Relations.
|
|
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
var query object.SearchFilters
|
|
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
|
|
|
var prm PrmObjectSearch
|
|
prm.SetContainerID(cnrID)
|
|
prm.SetFilters(query)
|
|
if tokens.Bearer != nil {
|
|
prm.UseBearer(*tokens.Bearer)
|
|
}
|
|
if tokens.Session != nil {
|
|
prm.UseSession(*tokens.Session)
|
|
}
|
|
|
|
resSearch, err := p.SearchObjects(ctx, prm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to find object children: %w", err)
|
|
}
|
|
|
|
var res []oid.ID
|
|
err = resSearch.Iterate(func(id oid.ID) bool {
|
|
res = append(res, id)
|
|
return false
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
|
}
|
|
|
|
return res, nil
|
|
}
|