213d20e3fb
Define `XPrm` type for each `X` client operation which structures parameters. Export setters of each parameterized value. Emphasize that some parameters are required. Make the client panic when the parameters are incorrectly set. Get rid of vadiadic call options and `CallOption` type. Improve documentation of client behavior. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
997 lines
26 KiB
Go
997 lines
26 KiB
Go
package pool
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
apiclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
|
|
"github.com/nspcc-dev/neofs-sdk-go/accounting"
|
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
|
"github.com/nspcc-dev/neofs-sdk-go/container"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/eacl"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
|
"github.com/nspcc-dev/neofs-sdk-go/token"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Client is a wrapper for client.Client to generate mock.
|
|
type Client interface {
|
|
GetBalance(context.Context, client.GetBalancePrm) (*client.GetBalanceRes, error)
|
|
PutContainer(context.Context, client.ContainerPutPrm) (*client.ContainerPutRes, error)
|
|
GetContainer(context.Context, client.ContainerGetPrm) (*client.ContainerGetRes, error)
|
|
ListContainers(context.Context, client.ContainerListPrm) (*client.ContainerListRes, error)
|
|
DeleteContainer(context.Context, client.ContainerDeletePrm) (*client.ContainerDeleteRes, error)
|
|
EACL(context.Context, client.EACLPrm) (*client.EACLRes, error)
|
|
SetEACL(context.Context, client.SetEACLPrm) (*client.SetEACLRes, error)
|
|
AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error)
|
|
EndpointInfo(context.Context, client.EndpointInfoPrm) (*client.EndpointInfoRes, error)
|
|
NetworkInfo(context.Context, client.NetworkInfoPrm) (*client.NetworkInfoRes, error)
|
|
PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error)
|
|
DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error)
|
|
GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error)
|
|
HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error)
|
|
ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error)
|
|
HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error)
|
|
SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error)
|
|
AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error)
|
|
AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm) (*client.AnnounceIntermediateTrustRes, error)
|
|
CreateSession(context.Context, client.CreateSessionPrm) (*client.CreateSessionRes, error)
|
|
|
|
Raw() *apiclient.Client
|
|
|
|
Conn() io.Closer
|
|
}
|
|
|
|
// BuilderOptions contains options used to build connection pool.
|
|
type BuilderOptions struct {
|
|
Key *ecdsa.PrivateKey
|
|
Logger *zap.Logger
|
|
NodeConnectionTimeout time.Duration
|
|
NodeRequestTimeout time.Duration
|
|
ClientRebalanceInterval time.Duration
|
|
SessionExpirationEpoch uint64
|
|
nodesParams []*NodesParam
|
|
clientBuilder func(opts ...client.Option) (Client, error)
|
|
}
|
|
|
|
type NodesParam struct {
|
|
priority int
|
|
addresses []string
|
|
weights []float64
|
|
}
|
|
|
|
type NodeParam struct {
|
|
priority int
|
|
address string
|
|
weight float64
|
|
}
|
|
|
|
// Builder is an interim structure used to collect node addresses/weights and
|
|
// build connection pool subsequently.
|
|
type Builder struct {
|
|
nodeParams []NodeParam
|
|
}
|
|
|
|
// ContainerPollingParams contains parameters used in polling is a container created or not.
|
|
type ContainerPollingParams struct {
|
|
CreationTimeout time.Duration
|
|
PollInterval time.Duration
|
|
}
|
|
|
|
// DefaultPollingParams creates ContainerPollingParams with default values.
|
|
func DefaultPollingParams() *ContainerPollingParams {
|
|
return &ContainerPollingParams{
|
|
CreationTimeout: 120 * time.Second,
|
|
PollInterval: 5 * time.Second,
|
|
}
|
|
}
|
|
|
|
// AddNode adds address/weight pair to node PoolBuilder list.
|
|
func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder {
|
|
pb.nodeParams = append(pb.nodeParams, NodeParam{
|
|
address: address,
|
|
priority: priority,
|
|
weight: weight,
|
|
})
|
|
return pb
|
|
}
|
|
|
|
// Build creates new pool based on current PoolBuilder state and options.
|
|
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
|
if len(pb.nodeParams) == 0 {
|
|
return nil, errors.New("no NeoFS peers configured")
|
|
}
|
|
|
|
nodesParams := make(map[int]*NodesParam)
|
|
for _, param := range pb.nodeParams {
|
|
nodes, ok := nodesParams[param.priority]
|
|
if !ok {
|
|
nodes = &NodesParam{priority: param.priority}
|
|
}
|
|
nodes.addresses = append(nodes.addresses, param.address)
|
|
nodes.weights = append(nodes.weights, param.weight)
|
|
nodesParams[param.priority] = nodes
|
|
}
|
|
|
|
for _, nodes := range nodesParams {
|
|
nodes.weights = adjustWeights(nodes.weights)
|
|
options.nodesParams = append(options.nodesParams, nodes)
|
|
}
|
|
|
|
sort.Slice(options.nodesParams, func(i, j int) bool {
|
|
return options.nodesParams[i].priority < options.nodesParams[j].priority
|
|
})
|
|
|
|
if options.clientBuilder == nil {
|
|
options.clientBuilder = func(opts ...client.Option) (Client, error) {
|
|
return client.New(opts...)
|
|
}
|
|
}
|
|
|
|
return newPool(ctx, options)
|
|
}
|
|
|
|
// Pool is an interface providing connection artifacts on request.
|
|
type Pool interface {
|
|
Object
|
|
Container
|
|
Accounting
|
|
Connection() (Client, *session.Token, error)
|
|
OwnerID() *owner.ID
|
|
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
|
Close()
|
|
}
|
|
|
|
type Object interface {
|
|
PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*object.ID, error)
|
|
DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error
|
|
GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error)
|
|
GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error)
|
|
ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error)
|
|
ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error)
|
|
ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error)
|
|
SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error)
|
|
}
|
|
|
|
type Container interface {
|
|
PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error)
|
|
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
|
|
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
|
|
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
|
|
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error)
|
|
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
|
|
}
|
|
|
|
type Accounting interface {
|
|
Balance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error)
|
|
}
|
|
|
|
type clientPack struct {
|
|
client Client
|
|
healthy bool
|
|
address string
|
|
}
|
|
|
|
type CallOption func(config *callConfig)
|
|
|
|
type callConfig struct {
|
|
isRetry bool
|
|
useDefaultSession bool
|
|
|
|
key *ecdsa.PrivateKey
|
|
btoken *token.BearerToken
|
|
stoken *session.Token
|
|
}
|
|
|
|
func WithKey(key *ecdsa.PrivateKey) CallOption {
|
|
return func(config *callConfig) {
|
|
config.key = key
|
|
}
|
|
}
|
|
|
|
func WithBearer(token *token.BearerToken) CallOption {
|
|
return func(config *callConfig) {
|
|
config.btoken = token
|
|
}
|
|
}
|
|
|
|
func WithSession(token *session.Token) CallOption {
|
|
return func(config *callConfig) {
|
|
config.stoken = token
|
|
}
|
|
}
|
|
|
|
func retry() CallOption {
|
|
return func(config *callConfig) {
|
|
config.isRetry = true
|
|
}
|
|
}
|
|
|
|
func useDefaultSession() CallOption {
|
|
return func(config *callConfig) {
|
|
config.useDefaultSession = true
|
|
}
|
|
}
|
|
|
|
func cfgFromOpts(opts ...CallOption) *callConfig {
|
|
var cfg = new(callConfig)
|
|
for _, opt := range opts {
|
|
opt(cfg)
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
var _ Pool = (*pool)(nil)
|
|
|
|
type pool struct {
|
|
innerPools []*innerPool
|
|
key *ecdsa.PrivateKey
|
|
owner *owner.ID
|
|
cancel context.CancelFunc
|
|
closedCh chan struct{}
|
|
cache *SessionCache
|
|
}
|
|
|
|
type innerPool struct {
|
|
lock sync.RWMutex
|
|
sampler *Sampler
|
|
clientPacks []*clientPack
|
|
}
|
|
|
|
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
|
cache, err := NewCache()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
|
}
|
|
|
|
ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey)
|
|
|
|
inner := make([]*innerPool, len(options.nodesParams))
|
|
var atLeastOneHealthy bool
|
|
|
|
var cliPrm client.CreateSessionPrm
|
|
|
|
cliPrm.SetExp(options.SessionExpirationEpoch)
|
|
|
|
for i, params := range options.nodesParams {
|
|
clientPacks := make([]*clientPack, len(params.weights))
|
|
for j, address := range params.addresses {
|
|
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
|
|
client.WithURIAddress(address, nil),
|
|
client.WithDialTimeout(options.NodeConnectionTimeout),
|
|
client.WithNeoFSErrorParsing())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var healthy bool
|
|
cliRes, err := c.CreateSession(ctx, cliPrm)
|
|
if err != nil && options.Logger != nil {
|
|
options.Logger.Warn("failed to create neofs session token for client",
|
|
zap.String("address", address),
|
|
zap.Error(err))
|
|
} else if err == nil {
|
|
healthy, atLeastOneHealthy = true, true
|
|
st := sessionTokenForOwner(ownerID, cliRes)
|
|
_ = cache.Put(formCacheKey(address, options.Key), st)
|
|
}
|
|
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
|
|
}
|
|
source := rand.NewSource(time.Now().UnixNano())
|
|
sampler := NewSampler(params.weights, source)
|
|
|
|
inner[i] = &innerPool{
|
|
sampler: sampler,
|
|
clientPacks: clientPacks,
|
|
}
|
|
}
|
|
|
|
if !atLeastOneHealthy {
|
|
return nil, fmt.Errorf("at least one node must be healthy")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
pool := &pool{
|
|
innerPools: inner,
|
|
key: options.Key,
|
|
owner: ownerID,
|
|
cancel: cancel,
|
|
closedCh: make(chan struct{}),
|
|
cache: cache,
|
|
}
|
|
go startRebalance(ctx, pool, options)
|
|
return pool, nil
|
|
}
|
|
|
|
func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
|
|
ticker := time.NewTimer(options.ClientRebalanceInterval)
|
|
buffers := make([][]float64, len(options.nodesParams))
|
|
for i, params := range options.nodesParams {
|
|
buffers[i] = make([]float64, len(params.weights))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(p.closedCh)
|
|
return
|
|
case <-ticker.C:
|
|
updateNodesHealth(ctx, p, options, buffers)
|
|
ticker.Reset(options.ClientRebalanceInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) {
|
|
wg := sync.WaitGroup{}
|
|
for i, inner := range p.innerPools {
|
|
wg.Add(1)
|
|
|
|
bufferWeights := buffers[i]
|
|
go func(i int, innerPool *innerPool) {
|
|
defer wg.Done()
|
|
updateInnerNodesHealth(ctx, p, i, options, bufferWeights)
|
|
}(i, inner)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) {
|
|
if i > len(pool.innerPools)-1 {
|
|
return
|
|
}
|
|
p := pool.innerPools[i]
|
|
|
|
healthyChanged := false
|
|
wg := sync.WaitGroup{}
|
|
|
|
var (
|
|
prmEndpoint client.EndpointInfoPrm
|
|
prmSession client.CreateSessionPrm
|
|
)
|
|
|
|
prmSession.SetExp(options.SessionExpirationEpoch)
|
|
|
|
for j, cPack := range p.clientPacks {
|
|
wg.Add(1)
|
|
go func(j int, cli Client) {
|
|
defer wg.Done()
|
|
ok := true
|
|
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
|
defer c()
|
|
|
|
if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil {
|
|
ok = false
|
|
bufferWeights[j] = 0
|
|
}
|
|
p.lock.RLock()
|
|
cp := *p.clientPacks[j]
|
|
p.lock.RUnlock()
|
|
|
|
if ok {
|
|
bufferWeights[j] = options.nodesParams[i].weights[j]
|
|
if !cp.healthy {
|
|
if cliRes, err := cli.CreateSession(ctx, prmSession); err != nil {
|
|
ok = false
|
|
bufferWeights[j] = 0
|
|
} else {
|
|
tkn := pool.newSessionToken(cliRes)
|
|
|
|
_ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn)
|
|
}
|
|
}
|
|
} else {
|
|
pool.cache.DeleteByPrefix(cp.address)
|
|
}
|
|
|
|
p.lock.Lock()
|
|
if p.clientPacks[j].healthy != ok {
|
|
p.clientPacks[j].healthy = ok
|
|
healthyChanged = true
|
|
}
|
|
p.lock.Unlock()
|
|
}(j, cPack.client)
|
|
}
|
|
wg.Wait()
|
|
|
|
if healthyChanged {
|
|
probabilities := adjustWeights(bufferWeights)
|
|
source := rand.NewSource(time.Now().UnixNano())
|
|
p.lock.Lock()
|
|
p.sampler = NewSampler(probabilities, source)
|
|
p.lock.Unlock()
|
|
}
|
|
}
|
|
|
|
func adjustWeights(weights []float64) []float64 {
|
|
adjusted := make([]float64, len(weights))
|
|
sum := 0.0
|
|
for _, weight := range weights {
|
|
sum += weight
|
|
}
|
|
if sum > 0 {
|
|
for i, weight := range weights {
|
|
adjusted[i] = weight / sum
|
|
}
|
|
}
|
|
|
|
return adjusted
|
|
}
|
|
|
|
func (p *pool) Connection() (Client, *session.Token, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
token := p.cache.Get(formCacheKey(cp.address, p.key))
|
|
return cp.client, token, nil
|
|
}
|
|
|
|
func (p *pool) connection() (*clientPack, error) {
|
|
for _, inner := range p.innerPools {
|
|
cp, err := inner.connection()
|
|
if err == nil {
|
|
return cp, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
|
|
func (p *innerPool) connection() (*clientPack, error) {
|
|
p.lock.RLock()
|
|
defer p.lock.RUnlock()
|
|
if len(p.clientPacks) == 1 {
|
|
cp := p.clientPacks[0]
|
|
if cp.healthy {
|
|
return cp, nil
|
|
}
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
attempts := 3 * len(p.clientPacks)
|
|
for k := 0; k < attempts; k++ {
|
|
i := p.sampler.Next()
|
|
if cp := p.clientPacks[i]; cp.healthy {
|
|
return cp, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.New("no healthy client")
|
|
}
|
|
|
|
func (p *pool) OwnerID() *owner.ID {
|
|
return p.owner
|
|
}
|
|
|
|
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
|
k := keys.PrivateKey{PrivateKey: *key}
|
|
return address + k.String()
|
|
}
|
|
|
|
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client.CallOption, error) {
|
|
cp, err := p.connection()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
clientCallOptions := make([]client.CallOption, 0, 3)
|
|
|
|
key := p.key
|
|
if cfg.key != nil {
|
|
key = cfg.key
|
|
}
|
|
clientCallOptions = append(clientCallOptions, client.WithKey(key))
|
|
|
|
sessionToken := cfg.stoken
|
|
if sessionToken == nil && cfg.useDefaultSession {
|
|
cacheKey := formCacheKey(cp.address, key)
|
|
sessionToken = p.cache.Get(cacheKey)
|
|
if sessionToken == nil {
|
|
var cliPrm client.CreateSessionPrm
|
|
|
|
cliPrm.SetExp(math.MaxUint32)
|
|
|
|
cliRes, err := cp.client.CreateSession(ctx, cliPrm)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ownerID := owner.NewIDFromPublicKey(&key.PublicKey)
|
|
sessionToken = sessionTokenForOwner(ownerID, cliRes)
|
|
|
|
cfg.stoken = sessionToken
|
|
|
|
_ = p.cache.Put(cacheKey, sessionToken)
|
|
}
|
|
}
|
|
clientCallOptions = append(clientCallOptions, client.WithSession(sessionToken))
|
|
|
|
if cfg.btoken != nil {
|
|
clientCallOptions = append(clientCallOptions, client.WithBearer(cfg.btoken))
|
|
}
|
|
|
|
return cp, clientCallOptions, nil
|
|
}
|
|
|
|
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
if strings.Contains(err.Error(), "session token does not exist") {
|
|
p.cache.DeleteByPrefix(address)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := cp.client.PutObject(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.PutObject(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.ID(), nil
|
|
}
|
|
|
|
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = cp.client.DeleteObject(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.DeleteObject(ctx, params, opts...)
|
|
}
|
|
|
|
// here err already carries both status and client errors
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := cp.client.GetObject(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.GetObject(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Object(), nil
|
|
}
|
|
|
|
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := cp.client.HeadObject(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.GetObjectHeader(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Object(), nil
|
|
}
|
|
|
|
func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.ObjectPayloadRangeData(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Data(), nil
|
|
}
|
|
|
|
func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams {
|
|
var prmCopy client.RangeChecksumParams
|
|
|
|
prmCopy.WithAddress(prm.Address())
|
|
prmCopy.WithSalt(prm.Salt())
|
|
prmCopy.WithRangeList(prm.RangeList()...)
|
|
|
|
return &prmCopy
|
|
}
|
|
|
|
func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// FIXME: pretty bad approach but we should not mutate params through the pointer
|
|
// If non-SHA256 algo is set then we need to reset it.
|
|
params = copyRangeChecksumParams(params)
|
|
// SHA256 by default, no need to do smth
|
|
|
|
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.ObjectPayloadRangeSHA256(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
cliHashes := res.Hashes()
|
|
|
|
hs := make([][sha256.Size]byte, len(cliHashes))
|
|
|
|
for i := range cliHashes {
|
|
if ln := len(cliHashes[i]); ln != sha256.Size {
|
|
return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln)
|
|
}
|
|
|
|
copy(hs[i][:], cliHashes[i])
|
|
}
|
|
|
|
return hs, nil
|
|
}
|
|
|
|
func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// FIXME: pretty bad approach but we should not mutate params through the pointer
|
|
// We need to set Tillich-Zemor algo.
|
|
params = copyRangeChecksumParams(params)
|
|
params.TZ()
|
|
|
|
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.ObjectPayloadRangeTZ(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
cliHashes := res.Hashes()
|
|
|
|
hs := make([][client.TZSize]byte, len(cliHashes))
|
|
|
|
for i := range cliHashes {
|
|
if ln := len(cliHashes[i]); ln != client.TZSize {
|
|
return nil, fmt.Errorf("invalid TZ checksum size %d", ln)
|
|
}
|
|
|
|
copy(hs[i][:], cliHashes[i])
|
|
}
|
|
|
|
return hs, nil
|
|
}
|
|
|
|
func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
|
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
|
cp, options, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := cp.client.SearchObjects(ctx, params, options...)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.SearchObject(ctx, params, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.IDList(), nil
|
|
}
|
|
|
|
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cliPrm client.ContainerPutPrm
|
|
|
|
if cnr != nil {
|
|
cliPrm.SetContainer(*cnr)
|
|
}
|
|
|
|
if cfg.stoken != nil {
|
|
cliPrm.SetSessionToken(*cfg.stoken)
|
|
}
|
|
|
|
res, err := cp.client.PutContainer(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.PutContainer(ctx, cnr, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.ID(), nil
|
|
}
|
|
|
|
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cliPrm client.ContainerGetPrm
|
|
|
|
if cid != nil {
|
|
cliPrm.SetContainer(*cid)
|
|
}
|
|
|
|
res, err := cp.client.GetContainer(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.GetContainer(ctx, cid, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Container(), nil
|
|
}
|
|
|
|
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cliPrm client.ContainerListPrm
|
|
|
|
if ownerID != nil {
|
|
cliPrm.SetAccount(*ownerID)
|
|
}
|
|
|
|
res, err := cp.client.ListContainers(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.ListContainers(ctx, ownerID, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Containers(), nil
|
|
}
|
|
|
|
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var cliPrm client.ContainerDeletePrm
|
|
|
|
if cid != nil {
|
|
cliPrm.SetContainer(*cid)
|
|
}
|
|
|
|
if cfg.stoken != nil {
|
|
cliPrm.SetSessionToken(*cfg.stoken)
|
|
}
|
|
|
|
_, err = cp.client.DeleteContainer(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.DeleteContainer(ctx, cid, opts...)
|
|
}
|
|
|
|
// here err already carries both status and client errors
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cliPrm client.EACLPrm
|
|
|
|
if cid != nil {
|
|
cliPrm.SetContainer(*cid)
|
|
}
|
|
|
|
res, err := cp.client.EACL(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.GetEACL(ctx, cid, opts...)
|
|
}
|
|
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Table(), nil
|
|
}
|
|
|
|
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var cliPrm client.SetEACLPrm
|
|
|
|
if table != nil {
|
|
cliPrm.SetTable(*table)
|
|
}
|
|
|
|
if cfg.stoken != nil {
|
|
cliPrm.SetSessionToken(*cfg.stoken)
|
|
}
|
|
|
|
_, err = cp.client.SetEACL(ctx, cliPrm)
|
|
|
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
|
opts = append(opts, retry())
|
|
return p.SetEACL(ctx, table, opts...)
|
|
}
|
|
|
|
// here err already carries both status and client errors
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) {
|
|
cfg := cfgFromOpts(opts...)
|
|
cp, _, err := p.conn(ctx, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cliPrm client.GetBalancePrm
|
|
|
|
if o != nil {
|
|
cliPrm.SetAccount(*o)
|
|
}
|
|
|
|
res, err := cp.client.GetBalance(ctx, cliPrm)
|
|
if err != nil { // here err already carries both status and client errors
|
|
return nil, err
|
|
}
|
|
|
|
return res.Amount(), nil
|
|
}
|
|
|
|
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
|
|
conn, _, err := p.Connection()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout)
|
|
defer cancel()
|
|
ticker := time.NewTimer(pollParams.PollInterval)
|
|
defer ticker.Stop()
|
|
wdone := wctx.Done()
|
|
done := ctx.Done()
|
|
|
|
var cliPrm client.ContainerGetPrm
|
|
|
|
if cid != nil {
|
|
cliPrm.SetContainer(*cid)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
case <-wdone:
|
|
return wctx.Err()
|
|
case <-ticker.C:
|
|
_, err = conn.GetContainer(ctx, cliPrm)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
ticker.Reset(pollParams.PollInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the pool and releases all the associated resources.
|
|
func (p *pool) Close() {
|
|
p.cancel()
|
|
<-p.closedCh
|
|
}
|
|
|
|
// creates new session token from CreateSession call result.
|
|
func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token {
|
|
return sessionTokenForOwner(p.owner, cliRes)
|
|
}
|
|
|
|
// creates new session token with specified owner from CreateSession call result.
|
|
func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token {
|
|
st := session.NewToken()
|
|
st.SetOwnerID(id)
|
|
st.SetID(cliRes.ID())
|
|
st.SetSessionKey(cliRes.PublicKey())
|
|
|
|
return st
|
|
}
|