frostfs-sdk-go/pool/pool.go
Denis Kirillov 618eeb8172 [#74] Add priority to nodes
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
2021-11-26 16:17:45 +03:00

934 lines
24 KiB
Go

package pool
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/token"
"go.uber.org/zap"
)
// Client is a wrapper for client.Client to generate mock.
type Client interface {
client.Client
}
// BuilderOptions contains options used to build connection pool.
type BuilderOptions struct {
Key *ecdsa.PrivateKey
Logger *zap.Logger
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
SessionExpirationEpoch uint64
nodesParams []*NodesParam
clientBuilder func(opts ...client.Option) (client.Client, error)
}
type NodesParam struct {
priority int
addresses []string
weights []float64
}
type NodeParam struct {
priority int
address string
weight float64
}
// Builder is an interim structure used to collect node addresses/weights and
// build connection pool subsequently.
type Builder struct {
nodeParams []NodeParam
}
// ContainerPollingParams contains parameters used in polling is a container created or not.
type ContainerPollingParams struct {
CreationTimeout time.Duration
PollInterval time.Duration
}
// DefaultPollingParams creates ContainerPollingParams with default values.
func DefaultPollingParams() *ContainerPollingParams {
return &ContainerPollingParams{
CreationTimeout: 120 * time.Second,
PollInterval: 5 * time.Second,
}
}
// AddNode adds address/weight pair to node PoolBuilder list.
func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder {
pb.nodeParams = append(pb.nodeParams, NodeParam{
address: address,
priority: priority,
weight: weight,
})
return pb
}
// Build creates new pool based on current PoolBuilder state and options.
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) {
if len(pb.nodeParams) == 0 {
return nil, errors.New("no NeoFS peers configured")
}
nodesParams := make(map[int]*NodesParam)
for _, param := range pb.nodeParams {
nodes, ok := nodesParams[param.priority]
if !ok {
nodes = &NodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParams[param.priority] = nodes
}
for _, nodes := range nodesParams {
nodes.weights = adjustWeights(nodes.weights)
options.nodesParams = append(options.nodesParams, nodes)
}
sort.Slice(options.nodesParams, func(i, j int) bool {
return options.nodesParams[i].priority < options.nodesParams[j].priority
})
if options.clientBuilder == nil {
options.clientBuilder = client.New
}
return newPool(ctx, options)
}
// Pool is an interface providing connection artifacts on request.
type Pool interface {
Object
Container
Connection() (client.Client, *session.Token, error)
OwnerID() *owner.ID
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
Close()
}
type Object interface {
PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*object.ID, error)
DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error
GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error)
GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error)
ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error)
ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error)
ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error)
SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error)
}
type Container interface {
PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error)
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error)
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
AnnounceContainerUsedSpace(ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error
}
type clientPack struct {
client client.Client
healthy bool
address string
}
type CallOption func(config *callConfig)
type callConfig struct {
isRetry bool
key *ecdsa.PrivateKey
btoken *token.BearerToken
stoken *session.Token
}
func WithKey(key *ecdsa.PrivateKey) CallOption {
return func(config *callConfig) {
config.key = key
}
}
func WithBearer(token *token.BearerToken) CallOption {
return func(config *callConfig) {
config.btoken = token
}
}
func WithSession(token *session.Token) CallOption {
return func(config *callConfig) {
config.stoken = token
}
}
func retry() CallOption {
return func(config *callConfig) {
config.isRetry = true
}
}
func cfgFromOpts(opts ...CallOption) *callConfig {
var cfg = new(callConfig)
for _, opt := range opts {
opt(cfg)
}
return cfg
}
var _ Pool = (*pool)(nil)
type pool struct {
innerPools []*innerPool
key *ecdsa.PrivateKey
owner *owner.ID
cancel context.CancelFunc
closedCh chan struct{}
cache *SessionCache
}
type innerPool struct {
lock sync.RWMutex
sampler *Sampler
clientPacks []*clientPack
}
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
if err != nil {
return nil, err
}
cache, err := NewCache()
if err != nil {
return nil, fmt.Errorf("couldn't create cache: %w", err)
}
ownerID := owner.NewIDFromNeo3Wallet(wallet)
inner := make([]*innerPool, len(options.nodesParams))
var atLeastOneHealthy bool
for i, params := range options.nodesParams {
clientPacks := make([]*clientPack, len(params.weights))
for j, address := range params.addresses {
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
client.WithURIAddress(address, nil),
client.WithDialTimeout(options.NodeConnectionTimeout))
if err != nil {
return nil, err
}
var healthy bool
cliRes, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
if err != nil && options.Logger != nil {
options.Logger.Warn("failed to create neofs session token for client",
zap.String("address", address),
zap.Error(err))
} else if err == nil {
healthy, atLeastOneHealthy = true, true
st := sessionTokenForOwner(ownerID, cliRes)
_ = cache.Put(formCacheKey(address, options.Key), st)
}
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
}
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampler,
clientPacks: clientPacks,
}
}
if !atLeastOneHealthy {
return nil, fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
pool := &pool{
innerPools: inner,
key: options.Key,
owner: ownerID,
cancel: cancel,
closedCh: make(chan struct{}),
cache: cache,
}
go startRebalance(ctx, pool, options)
return pool, nil
}
func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
ticker := time.NewTimer(options.ClientRebalanceInterval)
buffers := make([][]float64, len(options.nodesParams))
for i, params := range options.nodesParams {
buffers[i] = make([]float64, len(params.weights))
}
for {
select {
case <-ctx.Done():
close(p.closedCh)
return
case <-ticker.C:
updateNodesHealth(ctx, p, options, buffers)
ticker.Reset(options.ClientRebalanceInterval)
}
}
}
func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
bufferWeights := buffers[i]
go func(i int, innerPool *innerPool) {
defer wg.Done()
updateInnerNodesHealth(ctx, p, i, options, bufferWeights)
}(i, inner)
}
wg.Wait()
}
func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) {
if i > len(pool.innerPools)-1 {
return
}
p := pool.innerPools[i]
healthyChanged := false
wg := sync.WaitGroup{}
for j, cPack := range p.clientPacks {
wg.Add(1)
go func(j int, client client.Client) {
defer wg.Done()
ok := true
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
defer c()
if _, err := client.EndpointInfo(tctx); err != nil {
ok = false
bufferWeights[j] = 0
}
p.lock.RLock()
cp := *p.clientPacks[j]
p.lock.RUnlock()
if ok {
bufferWeights[j] = options.nodesParams[i].weights[j]
if !cp.healthy {
if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
ok = false
bufferWeights[j] = 0
} else {
tkn := pool.newSessionToken(cliRes)
_ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn)
}
}
} else {
pool.cache.DeleteByPrefix(cp.address)
}
p.lock.Lock()
if p.clientPacks[j].healthy != ok {
p.clientPacks[j].healthy = ok
healthyChanged = true
}
p.lock.Unlock()
}(j, cPack.client)
}
wg.Wait()
if healthyChanged {
probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano())
p.lock.Lock()
p.sampler = NewSampler(probabilities, source)
p.lock.Unlock()
}
}
func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights))
sum := 0.0
for _, weight := range weights {
sum += weight
}
if sum > 0 {
for i, weight := range weights {
adjusted[i] = weight / sum
}
}
return adjusted
}
func (p *pool) Connection() (client.Client, *session.Token, error) {
cp, err := p.connection()
if err != nil {
return nil, nil, err
}
token := p.cache.Get(formCacheKey(cp.address, p.key))
return cp.client, token, nil
}
func (p *pool) connection() (*clientPack, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (p *innerPool) connection() (*clientPack, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if len(p.clientPacks) == 1 {
cp := p.clientPacks[0]
if cp.healthy {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clientPacks)
for k := 0; k < attempts; k++ {
i := p.sampler.Next()
if cp := p.clientPacks[i]; cp.healthy {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (p *pool) OwnerID() *owner.ID {
return p.owner
}
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
buf := make([]byte, 32)
key.D.FillBytes(buf)
return address + hex.EncodeToString(buf)
}
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client.CallOption, error) {
cp, err := p.connection()
if err != nil {
return nil, nil, err
}
clientCallOptions := make([]client.CallOption, 0, 3)
key := p.key
if cfg.key != nil {
key = cfg.key
}
clientCallOptions = append(clientCallOptions, client.WithKey(key))
sessionToken := cfg.stoken
if sessionToken == nil {
cacheKey := formCacheKey(cp.address, key)
sessionToken = p.cache.Get(cacheKey)
if sessionToken == nil {
cliRes, err := cp.client.CreateSession(ctx, math.MaxUint32, clientCallOptions...)
if err != nil {
return nil, nil, err
}
sessionToken = p.newSessionToken(cliRes)
_ = p.cache.Put(cacheKey, sessionToken)
}
}
clientCallOptions = append(clientCallOptions, client.WithSession(sessionToken))
if cfg.btoken != nil {
clientCallOptions = append(clientCallOptions, client.WithBearer(cfg.btoken))
}
return cp, clientCallOptions, nil
}
func (p *pool) checkSessionTokenErr(err error, address string) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "session token does not exist") {
p.cache.DeleteByPrefix(address)
return true
}
return false
}
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*object.ID, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.PutObject(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.PutObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.ID(), nil
}
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return err
}
res, err := cp.client.DeleteObject(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.DeleteObject(ctx, params, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.GetObject(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Object(), nil
}
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.HeadObject(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetObjectHeader(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Object(), nil
}
func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeData(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Data(), nil
}
func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams {
var prmCopy client.RangeChecksumParams
prmCopy.WithAddress(prm.Address())
prmCopy.WithSalt(prm.Salt())
prmCopy.WithRangeList(prm.RangeList()...)
return &prmCopy
}
func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
// FIXME: pretty bad approach but we should not mutate params through the pointer
// If non-SHA256 algo is set then we need to reset it.
params = copyRangeChecksumParams(params)
// SHA256 by default, no need to do smth
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeSHA256(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
cliHashes := res.Hashes()
hs := make([][sha256.Size]byte, len(cliHashes))
for i := range cliHashes {
if ln := len(cliHashes[i]); ln != sha256.Size {
return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln)
}
copy(hs[i][:], cliHashes[i])
}
return hs, nil
}
func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
// FIXME: pretty bad approach but we should not mutate params through the pointer
// We need to set Tillich-Zemor algo.
params = copyRangeChecksumParams(params)
params.TZ()
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ObjectPayloadRangeTZ(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
cliHashes := res.Hashes()
hs := make([][client.TZSize]byte, len(cliHashes))
for i := range cliHashes {
if ln := len(cliHashes[i]); ln != client.TZSize {
return nil, fmt.Errorf("invalid TZ checksum size %d", ln)
}
copy(hs[i][:], cliHashes[i])
}
return hs, nil
}
func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.SearchObjects(ctx, params, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.SearchObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.IDList(), nil
}
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.PutContainer(ctx, cnr, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.PutContainer(ctx, cnr, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.ID(), nil
}
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.GetContainer(ctx, cid, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetContainer(ctx, cid, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Container(), nil
}
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.ListContainers(ctx, ownerID, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.ListContainers(ctx, ownerID, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.IDList(), nil
}
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return err
}
res, err := cp.client.DeleteContainer(ctx, cid, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.DeleteContainer(ctx, cid, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return nil, err
}
res, err := cp.client.EACL(ctx, cid, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.GetEACL(ctx, cid, opts...)
}
if err != nil { // here err already carries both status and client errors
return nil, err
}
return res.Table(), nil
}
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return err
}
res, err := cp.client.SetEACL(ctx, table, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.SetEACL(ctx, table, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error {
cfg := cfgFromOpts(opts...)
cp, options, err := p.conn(ctx, cfg)
if err != nil {
return err
}
res, err := cp.client.AnnounceContainerUsedSpace(ctx, announce, options...)
if err == nil {
// reflect status failures in err
err = apistatus.ErrFromStatus(res.Status())
}
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
opts = append(opts, retry())
return p.AnnounceContainerUsedSpace(ctx, announce, opts...)
}
// here err already carries both status and client errors
return err
}
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
conn, _, err := p.Connection()
if err != nil {
return err
}
wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout)
defer cancel()
ticker := time.NewTimer(pollParams.PollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-wdone:
return wctx.Err()
case <-ticker.C:
_, err = conn.GetContainer(ctx, cid)
if err == nil {
return nil
}
ticker.Reset(pollParams.PollInterval)
}
}
}
// Close closes the pool and releases all the associated resources.
func (p *pool) Close() {
p.cancel()
<-p.closedCh
}
// creates new session token from CreateSession call result.
func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token {
return sessionTokenForOwner(p.owner, cliRes)
}
// creates new session token with specified owner from CreateSession call result.
func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token {
st := session.NewToken()
st.SetOwnerID(id)
st.SetID(cliRes.ID())
st.SetSessionKey(cliRes.SessionKey())
return st
}