frostfs-sdk-go/pool/pool.go
Alex Vanin a86c08b3ee [#130] pool: Replace expiration value with expiration duration
Based on the applications that use pool, most of them simply
set session token duration to MaxUint64 value. It is completely
understandable, because epochs are incrementing and expiration
value will be surpassed sooner or later, unless it is MaxUint64.

As an alternative I suggest specifying duration instead of
absolute epoch values. Now apps can set duration of 100-200
epochs and pool automatically calculated expiration epoch
base on the network info.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-02-09 10:10:00 +03:00

1002 lines
27 KiB
Go

package pool
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"io"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
apiclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-sdk-go/accounting"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/token"
"go.uber.org/zap"
)
// Client is a wrapper for client.Client to generate mock.
type Client interface {
GetBalance(context.Context, client.GetBalancePrm) (*client.GetBalanceRes, error)
PutContainer(context.Context, client.ContainerPutPrm) (*client.ContainerPutRes, error)
GetContainer(context.Context, client.ContainerGetPrm) (*client.ContainerGetRes, error)
ListContainers(context.Context, client.ContainerListPrm) (*client.ContainerListRes, error)
DeleteContainer(context.Context, client.ContainerDeletePrm) (*client.ContainerDeleteRes, error)
EACL(context.Context, client.EACLPrm) (*client.EACLRes, error)
SetEACL(context.Context, client.SetEACLPrm) (*client.SetEACLRes, error)
AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error)
EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error)
NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error)
PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error)
DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error)
GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error)
HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error)
ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error)
HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error)
SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error)
AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error)
AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm) (*client.AnnounceIntermediateTrustRes, error)
CreateSession(context.Context, client.CreateSessionPrm) (*client.CreateSessionRes, error)
Raw() *apiclient.Client
Conn() io.Closer
}
// BuilderOptions contains options used to build connection pool.
type BuilderOptions struct {
Key *ecdsa.PrivateKey
Logger *zap.Logger
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
SessionExpirationDuration uint64
nodesParams []*NodesParam
clientBuilder func(opts ...client.Option) (Client, error)
}
type NodesParam struct {
priority int
addresses []string
weights []float64
}
type NodeParam struct {
priority int
address string
weight float64
}
// Builder is an interim structure used to collect node addresses/weights and
// build connection pool subsequently.
type Builder struct {
nodeParams []NodeParam
}
// ContainerPollingParams contains parameters used in polling is a container created or not.
type ContainerPollingParams struct {
CreationTimeout time.Duration
PollInterval time.Duration
}
// DefaultPollingParams creates ContainerPollingParams with default values.
func DefaultPollingParams() *ContainerPollingParams {
return &ContainerPollingParams{
CreationTimeout: 120 * time.Second,
PollInterval: 5 * time.Second,
}
}
// AddNode adds address/weight pair to node PoolBuilder list.
func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder {
pb.nodeParams = append(pb.nodeParams, NodeParam{
address: address,
priority: priority,
weight: weight,
})
return pb
}
// Build creates new pool based on current PoolBuilder state and options.
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) {
if len(pb.nodeParams) == 0 {
return nil, errors.New("no NeoFS peers configured")
}
nodesParams := make(map[int]*NodesParam)
for _, param := range pb.nodeParams {
nodes, ok := nodesParams[param.priority]
if !ok {
nodes = &NodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParams[param.priority] = nodes
}
for _, nodes := range nodesParams {
nodes.weights = adjustWeights(nodes.weights)
options.nodesParams = append(options.nodesParams, nodes)
}
sort.Slice(options.nodesParams, func(i, j int) bool {
return options.nodesParams[i].priority < options.nodesParams[j].priority
})
if options.clientBuilder == nil {
options.clientBuilder = func(opts ...client.Option) (Client, error) {
return client.New(opts...)
}
}
return newPool(ctx, options)
}
// Pool is an interface providing connection artifacts on request.
type Pool interface {
Object
Container
Accounting
Connection() (Client, *session.Token, error)
OwnerID() *owner.ID
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
Close()
}
type Object interface {
PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error)
DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error
GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error)
GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error)
ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error)
ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error)
ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error)
SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error)
}
type Container interface {
PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error)
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error)
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
}
type Accounting interface {
Balance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error)
}
type clientPack struct {
client Client
healthy bool
address string
}
type CallOption func(config *callConfig)
type callConfig struct {
isRetry bool
useDefaultSession bool
key *ecdsa.PrivateKey
btoken *token.BearerToken
stoken *session.Token
}
func WithKey(key *ecdsa.PrivateKey) CallOption {
return func(config *callConfig) {
config.key = key
}
}
func WithBearer(token *token.BearerToken) CallOption {
return func(config *callConfig) {
config.btoken = token
}
}
func WithSession(token *session.Token) CallOption {
return func(config *callConfig) {
config.stoken = token
}
}
func retry() CallOption {
return func(config *callConfig) {
config.isRetry = true
}
}
func useDefaultSession() CallOption {
return func(config *callConfig) {
config.useDefaultSession = true
}
}
func cfgFromOpts(opts ...CallOption) *callConfig {
var cfg = new(callConfig)
for _, opt := range opts {
opt(cfg)
}
return cfg
}
var _ Pool = (*pool)(nil)
type pool struct {
innerPools []*innerPool
key *ecdsa.PrivateKey
owner *owner.ID
cancel context.CancelFunc
closedCh chan struct{}
cache *SessionCache
stokenDuration uint64
}
type innerPool struct {
lock sync.RWMutex
sampler *Sampler
clientPacks []*clientPack
}
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
cache, err := NewCache()
if err != nil {
return nil, fmt.Errorf("couldn't create cache: %w", err)
}
ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey)
inner := make([]*innerPool, len(options.nodesParams))
var atLeastOneHealthy bool
for i, params := range options.nodesParams {
clientPacks := make([]*clientPack, len(params.weights))
for j, address := range params.addresses {
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
client.WithURIAddress(address, nil),
client.WithDialTimeout(options.NodeConnectionTimeout),
client.WithNeoFSErrorParsing())
if err != nil {
return nil, err
}
var healthy bool
cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration)
if err != nil && options.Logger != nil {
options.Logger.Warn("failed to create neofs session token for client",
zap.String("address", address),
zap.Error(err))
} else if err == nil {
healthy, atLeastOneHealthy = true, true
st := sessionTokenForOwner(ownerID, cliRes)
_ = cache.Put(formCacheKey(address, options.Key), st)
}
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
}
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampler,
clientPacks: clientPacks,
}
}
if !atLeastOneHealthy {
return nil, fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
pool := &pool{
innerPools: inner,
key: options.Key,
owner: ownerID,
cancel: cancel,
closedCh: make(chan struct{}),
cache: cache,
stokenDuration: options.SessionExpirationDuration,
}
go startRebalance(ctx, pool, options)
return pool, nil
}
func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
ticker := time.NewTimer(options.ClientRebalanceInterval)
buffers := make([][]float64, len(options.nodesParams))
for i, params := range options.nodesParams {
buffers[i] = make([]float64, len(params.weights))
}
for {
select {
case <-ctx.Done():
close(p.closedCh)
return
case <-ticker.C:
updateNodesHealth(ctx, p, options, buffers)
ticker.Reset(options.ClientRebalanceInterval)
}
}
}
func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
bufferWeights := buffers[i]
go func(i int, innerPool *innerPool) {
defer wg.Done()
updateInnerNodesHealth(ctx, p, i, options, bufferWeights)
}(i, inner)
}
wg.Wait()
}
func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) {
if i > len(pool.innerPools)-1 {
return
}
p := pool.innerPools[i]
healthyChanged := false
wg := sync.WaitGroup{}
var prmEndpoint client.EndpointInfoPrm
for j, cPack := range p.clientPacks {
wg.Add(1)
go func(j int, cli Client) {
defer wg.Done()
ok := true
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
defer c()
if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil {
ok = false
bufferWeights[j] = 0
}
p.lock.RLock()
cp := *p.clientPacks[j]
p.lock.RUnlock()
if ok {
bufferWeights[j] = options.nodesParams[i].weights[j]
if !cp.healthy {
cliRes, err := createSessionTokenForDuration(ctx, cli, options.SessionExpirationDuration)
if err != nil {
ok = false
bufferWeights[j] = 0
} else {
tkn := pool.newSessionToken(cliRes)
_ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn)
}
}
} else {
pool.cache.DeleteByPrefix(cp.address)
}
p.lock.Lock()
if p.clientPacks[j].healthy != ok {
p.clientPacks[j].healthy = ok
healthyChanged = true
}
p.lock.Unlock()
}(j, cPack.client)
}
wg.Wait()
if healthyChanged {
probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano())
p.lock.Lock()
p.sampler = NewSampler(probabilities, source)
p.lock.Unlock()
}
}
func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights))
sum := 0.0
for _, weight := range weights {
sum += weight
}
if sum > 0 {
for i, weight := range weights {
adjusted[i] = weight / sum
}
}
return adjusted
}
func (p *pool) Connection() (Client, *session.Token, error) {
cp, err := p.connection()
if err != nil {
return nil, nil, err
}
token := p.cache.Get(formCacheKey(cp.address, p.key))
return cp.client, token, nil
}
func (p *pool) connection() (*clientPack, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (p *innerPool) connection() (*clientPack, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if len(p.clientPacks) == 1 {
cp := p.clientPacks[0]
if cp.healthy {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clientPacks)
for k := 0; k < attempts; k++ {
i := p.sampler.Next()
if cp := p.clientPacks[i]; cp.healthy {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (p *pool) OwnerID() *owner.ID {
return p.owner
}
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
k := keys.PrivateKey{PrivateKey: *key}
return address + k.String()
}
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client.CallOption, error) {
cp, err := p.connection()
if err != nil {
return nil, nil, err
}
clientCallOptions := make([]client.CallOption, 0, 3)
key := p.key
if cfg.key != nil {
key = cfg.key
}
clientCallOptions = append(clientCallOptions, client.WithKey(key))
sessionToken := cfg.stoken
if sessionToken == nil && cfg.useDefaultSession {
cacheKey := formCacheKey(cp.address, key)
sessionToken = p.cache.Get(cacheKey)
if sessionToken == nil {
cliRes, err := createSessionTokenForDuration(ctx, cp.client, p.stokenDuration)
if err != nil {
return nil, nil, err
}
ownerID := owner.NewIDFromPublicKey(&key.PublicKey)
sessionToken = sessionTokenForOwner(ownerID, cliRes)
cfg.stoken = sessionToken
_ = p.cache.Put(cacheKey, sessionToken)
}
}
clientCallOptions = append(clientCallOptions, client.WithSession(sessionToken))
if cfg.btoken != nil {
clientCallOptions = append(clientCallOptions, client.WithBearer(cfg.btoken))
}
return cp, clientCallOptions, nil
}
func (p *pool) checkSessionTokenErr(err error, address string) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "session token does not exist") {
p.cache.DeleteByPrefix(address)
return true
}
return false
}
func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (*client.CreateSessionRes, error) {
ni, err := c.NetworkInfo(ctx, client.NetworkInfoPrm{})
if err != nil {
return nil, err
}
epoch := ni.Info().CurrentEpoch()
var prm client.CreateSessionPrm
if math.MaxUint64-epoch < dur {
prm.SetExp(math.MaxUint64)
} else {
prm.SetExp(epoch + dur)
}
return c.CreateSession(ctx, prm)
}
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.PutObject(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.PutObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.ID(), nil
}
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return err
}
_, err = cp.client.DeleteObject(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.DeleteObject(ctx, params, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.GetObject(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Object(), nil
}
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.HeadObject(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetObjectHeader(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Object(), nil
}
func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeData(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Data(), nil
}
func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams {
var prmCopy client.RangeChecksumParams
prmCopy.WithAddress(prm.Address())
prmCopy.WithSalt(prm.Salt())
prmCopy.WithRangeList(prm.RangeList()...)
return &prmCopy
}
func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
// FIXME: pretty bad approach but we should not mutate params through the pointer
// If non-SHA256 algo is set then we need to reset it.
params = copyRangeChecksumParams(params)
// SHA256 by default, no need to do smth
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeSHA256(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
cliHashes := res.Hashes()
hs := make([][sha256.Size]byte, len(cliHashes))
for i := range cliHashes {
if ln := len(cliHashes[i]); ln != sha256.Size {
return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln)
}
copy(hs[i][:], cliHashes[i])
}
return hs, nil
}
func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
// FIXME: pretty bad approach but we should not mutate params through the pointer
// We need to set Tillich-Zemor algo.
params = copyRangeChecksumParams(params)
params.TZ()
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeTZ(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
cliHashes := res.Hashes()
hs := make([][client.TZSize]byte, len(cliHashes))
for i := range cliHashes {
if ln := len(cliHashes[i]); ln != client.TZSize {
return nil, fmt.Errorf("invalid TZ checksum size %d", ln)
}
copy(hs[i][:], cliHashes[i])
}
return hs, nil
}
func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*oid.ID, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.SearchObjects(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.SearchObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.IDList(), nil
}
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
var cliPrm client.ContainerPutPrm
if cnr != nil {
cliPrm.SetContainer(*cnr)
}
res, err := cp.client.PutContainer(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.PutContainer(ctx, cnr, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.ID(), nil
}
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
var cliPrm client.ContainerGetPrm
if cid != nil {
cliPrm.SetContainer(*cid)
}
res, err := cp.client.GetContainer(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetContainer(ctx, cid, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Container(), nil
}
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
var cliPrm client.ContainerListPrm
if ownerID != nil {
cliPrm.SetAccount(*ownerID)
}
res, err := cp.client.ListContainers(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ListContainers(ctx, ownerID, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Containers(), nil
}
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return err
}
var cliPrm client.ContainerDeletePrm
if cid != nil {
cliPrm.SetContainer(*cid)
}
if cfg.stoken != nil {
cliPrm.SetSessionToken(*cfg.stoken)
}
_, err = cp.client.DeleteContainer(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.DeleteContainer(ctx, cid, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
var cliPrm client.EACLPrm
if cid != nil {
cliPrm.SetContainer(*cid)
}
res, err := cp.client.EACL(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetEACL(ctx, cid, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Table(), nil
}
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return err
}
var cliPrm client.SetEACLPrm
if table != nil {
cliPrm.SetTable(*table)
}
if cfg.stoken != nil {
cliPrm.SetSessionToken(*cfg.stoken)
}
_, err = cp.client.SetEACL(ctx, cliPrm)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.SetEACL(ctx, table, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) {
cfg := cfgFromOpts(opts...)
cp, _, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
var cliPrm client.GetBalancePrm
if o != nil {
cliPrm.SetAccount(*o)
}
res, err := cp.client.GetBalance(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Amount(), nil
}
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
conn, _, err := p.Connection()
if err != nil {
return err
}
wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout)
defer cancel()
ticker := time.NewTimer(pollParams.PollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
var cliPrm client.ContainerGetPrm
if cid != nil {
cliPrm.SetContainer(*cid)
}
for {
select {
case <-done:
return ctx.Err()
case <-wdone:
return wctx.Err()
case <-ticker.C:
_, err = conn.GetContainer(ctx, cliPrm)
if err == nil {
return nil
}
ticker.Reset(pollParams.PollInterval)
}
}
}
// Close closes the pool and releases all the associated resources.
func (p *pool) Close() {
p.cancel()
<-p.closedCh
}
// creates new session token from CreateSession call result.
func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token {
return sessionTokenForOwner(p.owner, cliRes)
}
// creates new session token with specified owner from CreateSession call result.
func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token {
st := session.NewToken()
st.SetOwnerID(id)
st.SetID(cliRes.ID())
st.SetSessionKey(cliRes.PublicKey())
return st
}