forked from TrueCloudLab/frostfs-sdk-go
[#38] Add session token cache
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
10b78e74af
commit
c651a714fd
6 changed files with 325 additions and 23 deletions
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1
|
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1
|
||||||
|
github.com/bluele/gcache v0.0.2
|
||||||
github.com/golang/mock v1.6.0
|
github.com/golang/mock v1.6.0
|
||||||
github.com/google/uuid v1.2.0
|
github.com/google/uuid v1.2.0
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -31,6 +31,8 @@ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx
|
||||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||||
|
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
|
||||||
|
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
|
||||||
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
||||||
github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo=
|
github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo=
|
||||||
github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA=
|
github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA=
|
||||||
|
|
47
pool/cache.go
Normal file
47
pool/cache.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/bluele/gcache"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SessionCache struct {
|
||||||
|
cache gcache.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCache() *SessionCache {
|
||||||
|
return &SessionCache{
|
||||||
|
cache: gcache.New(100).Build(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SessionCache) Get(key string) *session.Token {
|
||||||
|
tokenRaw, err := c.cache.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
token, ok := tokenRaw.(*session.Token)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return token
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SessionCache) Put(key string, token *session.Token) error {
|
||||||
|
return c.cache.Set(key, token)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SessionCache) DeleteByPrefix(prefix string) {
|
||||||
|
for _, key := range c.cache.Keys(false) {
|
||||||
|
keyStr, ok := key.(string)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(keyStr, prefix) {
|
||||||
|
c.cache.Remove(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
277
pool/pool.go
277
pool/pool.go
|
@ -5,10 +5,13 @@ import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/container"
|
"github.com/nspcc-dev/neofs-sdk-go/container"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
@ -89,12 +92,34 @@ type Pool interface {
|
||||||
OwnerID() *owner.ID
|
OwnerID() *owner.ID
|
||||||
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
||||||
Close()
|
Close()
|
||||||
|
|
||||||
|
PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam CallParam) (*object.ID, error)
|
||||||
|
DeleteObjectParam(ctx context.Context, params *client.DeleteObjectParams, callParam CallParam) error
|
||||||
|
GetObjectParam(ctx context.Context, params *client.GetObjectParams, callParam CallParam) (*object.Object, error)
|
||||||
|
GetObjectHeaderParam(ctx context.Context, params *client.ObjectHeaderParams, callParam CallParam) (*object.Object, error)
|
||||||
|
ObjectPayloadRangeDataParam(ctx context.Context, params *client.RangeDataParams, callParam CallParam) ([]byte, error)
|
||||||
|
ObjectPayloadRangeSHA256Param(ctx context.Context, params *client.RangeChecksumParams, callParam CallParam) ([][32]byte, error)
|
||||||
|
ObjectPayloadRangeTZParam(ctx context.Context, params *client.RangeChecksumParams, callParam CallParam) ([][64]byte, error)
|
||||||
|
SearchObjectParam(ctx context.Context, params *client.SearchObjectParams, callParam CallParam) ([]*object.ID, error)
|
||||||
|
PutContainerParam(ctx context.Context, cnr *container.Container, callParam CallParam) (*cid.ID, error)
|
||||||
|
GetContainerParam(ctx context.Context, cid *cid.ID, callParam CallParam) (*container.Container, error)
|
||||||
|
ListContainersParam(ctx context.Context, ownerID *owner.ID, callParam CallParam) ([]*cid.ID, error)
|
||||||
|
DeleteContainerParam(ctx context.Context, cid *cid.ID, callParam CallParam) error
|
||||||
|
GetEACLParam(ctx context.Context, cid *cid.ID, callParam CallParam) (*client.EACLWithSignature, error)
|
||||||
|
SetEACLParam(ctx context.Context, table *eacl.Table, callParam CallParam) error
|
||||||
|
AnnounceContainerUsedSpaceParam(ctx context.Context, announce []container.UsedSpaceAnnouncement, callParam CallParam) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientPack struct {
|
type clientPack struct {
|
||||||
client client.Client
|
client client.Client
|
||||||
sessionToken *session.Token
|
healthy bool
|
||||||
healthy bool
|
address string
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallParam struct {
|
||||||
|
Key *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
Options []client.CallOption
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Pool = (*pool)(nil)
|
var _ Pool = (*pool)(nil)
|
||||||
|
@ -102,13 +127,17 @@ var _ Pool = (*pool)(nil)
|
||||||
type pool struct {
|
type pool struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
sampler *Sampler
|
sampler *Sampler
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
owner *owner.ID
|
owner *owner.ID
|
||||||
clientPacks []*clientPack
|
clientPacks []*clientPack
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
closedCh chan struct{}
|
closedCh chan struct{}
|
||||||
|
cache *SessionCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
|
cache := NewCache()
|
||||||
|
|
||||||
clientPacks := make([]*clientPack, len(options.weights))
|
clientPacks := make([]*clientPack, len(options.weights))
|
||||||
var atLeastOneHealthy bool
|
var atLeastOneHealthy bool
|
||||||
for i, address := range options.addresses {
|
for i, address := range options.addresses {
|
||||||
|
@ -126,8 +155,9 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
healthy, atLeastOneHealthy = true, true
|
healthy, atLeastOneHealthy = true, true
|
||||||
|
_ = cache.Put(formCacheKey(address, options.Key), st)
|
||||||
}
|
}
|
||||||
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: healthy}
|
clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !atLeastOneHealthy {
|
if !atLeastOneHealthy {
|
||||||
|
@ -143,7 +173,15 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
pool := &pool{sampler: sampler, owner: ownerID, clientPacks: clientPacks, cancel: cancel, closedCh: make(chan struct{})}
|
pool := &pool{
|
||||||
|
sampler: sampler,
|
||||||
|
key: options.Key,
|
||||||
|
owner: ownerID,
|
||||||
|
clientPacks: clientPacks,
|
||||||
|
cancel: cancel,
|
||||||
|
closedCh: make(chan struct{}),
|
||||||
|
cache: cache,
|
||||||
|
}
|
||||||
go startRebalance(ctx, pool, options)
|
go startRebalance(ctx, pool, options)
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
@ -175,33 +213,34 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu
|
||||||
|
|
||||||
go func(i int, client client.Client) {
|
go func(i int, client client.Client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var (
|
|
||||||
tkn *session.Token
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
ok := true
|
ok := true
|
||||||
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
||||||
defer c()
|
defer c()
|
||||||
if _, err = client.EndpointInfo(tctx); err != nil {
|
if _, err := client.EndpointInfo(tctx); err != nil {
|
||||||
ok = false
|
ok = false
|
||||||
bufferWeights[i] = 0
|
bufferWeights[i] = 0
|
||||||
}
|
}
|
||||||
|
p.lock.RLock()
|
||||||
|
cp := *p.clientPacks[i]
|
||||||
|
p.lock.RUnlock()
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
bufferWeights[i] = options.weights[i]
|
bufferWeights[i] = options.weights[i]
|
||||||
p.lock.RLock()
|
if !cp.healthy {
|
||||||
if !p.clientPacks[i].healthy {
|
if tkn, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
|
||||||
if tkn, err = client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
|
|
||||||
ok = false
|
ok = false
|
||||||
bufferWeights[i] = 0
|
bufferWeights[i] = 0
|
||||||
|
} else {
|
||||||
|
_ = p.cache.Put(formCacheKey(cp.address, p.key), tkn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.lock.RUnlock()
|
} else {
|
||||||
|
p.cache.DeleteByPrefix(cp.address)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
if p.clientPacks[i].healthy != ok {
|
if p.clientPacks[i].healthy != ok {
|
||||||
p.clientPacks[i].healthy = ok
|
p.clientPacks[i].healthy = ok
|
||||||
p.clientPacks[i].sessionToken = tkn
|
|
||||||
healthyChanged = true
|
healthyChanged = true
|
||||||
}
|
}
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
@ -234,23 +273,33 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Connection() (client.Client, *session.Token, error) {
|
func (p *pool) Connection() (client.Client, *session.Token, error) {
|
||||||
|
cp, err := p.connection()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token := p.cache.Get(formCacheKey(cp.address, p.key))
|
||||||
|
return cp.client, token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) connection() (*clientPack, error) {
|
||||||
p.lock.RLock()
|
p.lock.RLock()
|
||||||
defer p.lock.RUnlock()
|
defer p.lock.RUnlock()
|
||||||
if len(p.clientPacks) == 1 {
|
if len(p.clientPacks) == 1 {
|
||||||
cp := p.clientPacks[0]
|
cp := p.clientPacks[0]
|
||||||
if cp.healthy {
|
if cp.healthy {
|
||||||
return cp.client, cp.sessionToken, nil
|
return cp, nil
|
||||||
}
|
}
|
||||||
return nil, nil, errors.New("no healthy client")
|
return nil, errors.New("no healthy client")
|
||||||
}
|
}
|
||||||
attempts := 3 * len(p.clientPacks)
|
attempts := 3 * len(p.clientPacks)
|
||||||
for k := 0; k < attempts; k++ {
|
for k := 0; k < attempts; k++ {
|
||||||
i := p.sampler.Next()
|
i := p.sampler.Next()
|
||||||
if cp := p.clientPacks[i]; cp.healthy {
|
if cp := p.clientPacks[i]; cp.healthy {
|
||||||
return cp.client, cp.sessionToken, nil
|
return cp, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, nil, errors.New("no healthy client")
|
return nil, errors.New("no healthy client")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) OwnerID() *owner.ID {
|
func (p *pool) OwnerID() *owner.ID {
|
||||||
|
@ -265,6 +314,36 @@ func (p *pool) conn(option []client.CallOption) (client.Client, []client.CallOpt
|
||||||
return conn, append([]client.CallOption{client.WithSession(token)}, option...), nil
|
return conn, append([]client.CallOption{client.WithSession(token)}, option...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
||||||
|
k := keys.PrivateKey{PrivateKey: *key}
|
||||||
|
return address + k.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) connParam(ctx context.Context, param CallParam) (*clientPack, []client.CallOption, error) {
|
||||||
|
cp, err := p.connection()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := p.key
|
||||||
|
if param.Key != nil {
|
||||||
|
key = param.Key
|
||||||
|
}
|
||||||
|
|
||||||
|
param.Options = append(param.Options, client.WithKey(key))
|
||||||
|
cacheKey := formCacheKey(cp.address, key)
|
||||||
|
token := p.cache.Get(cacheKey)
|
||||||
|
if token == nil {
|
||||||
|
token, err = cp.client.CreateSession(ctx, math.MaxUint32, param.Options...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
_ = p.cache.Put(cacheKey, token)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cp, append([]client.CallOption{client.WithSession(token)}, param.Options...), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, option ...client.CallOption) (*object.ID, error) {
|
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, option ...client.CallOption) (*object.ID, error) {
|
||||||
conn, options, err := p.conn(option)
|
conn, options, err := p.conn(option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -385,6 +464,166 @@ func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []contai
|
||||||
return conn.AnnounceContainerUsedSpace(ctx, announce, options...)
|
return conn.AnnounceContainerUsedSpace(ctx, announce, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *pool) checkSessionTokenErr(err error, address string) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(err.Error(), "session token does not exist") {
|
||||||
|
p.cache.DeleteByPrefix(address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam CallParam) (*object.ID, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.PutObject(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) DeleteObjectParam(ctx context.Context, params *client.DeleteObjectParams, callParam CallParam) error {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = cp.client.DeleteObject(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) GetObjectParam(ctx context.Context, params *client.GetObjectParams, callParam CallParam) (*object.Object, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.GetObject(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) GetObjectHeaderParam(ctx context.Context, params *client.ObjectHeaderParams, callParam CallParam) (*object.Object, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.GetObjectHeader(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) ObjectPayloadRangeDataParam(ctx context.Context, params *client.RangeDataParams, callParam CallParam) ([]byte, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) ObjectPayloadRangeSHA256Param(ctx context.Context, params *client.RangeChecksumParams, callParam CallParam) ([][32]byte, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.ObjectPayloadRangeSHA256(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) ObjectPayloadRangeTZParam(ctx context.Context, params *client.RangeChecksumParams, callParam CallParam) ([][64]byte, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.ObjectPayloadRangeTZ(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) SearchObjectParam(ctx context.Context, params *client.SearchObjectParams, callParam CallParam) ([]*object.ID, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.SearchObject(ctx, params, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) PutContainerParam(ctx context.Context, cnr *container.Container, callParam CallParam) (*cid.ID, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.PutContainer(ctx, cnr, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) GetContainerParam(ctx context.Context, cid *cid.ID, callParam CallParam) (*container.Container, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.GetContainer(ctx, cid, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) ListContainersParam(ctx context.Context, ownerID *owner.ID, callParam CallParam) ([]*cid.ID, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.ListContainers(ctx, ownerID, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) DeleteContainerParam(ctx context.Context, cid *cid.ID, callParam CallParam) error {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = cp.client.DeleteContainer(ctx, cid, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) GetEACLParam(ctx context.Context, cid *cid.ID, callParam CallParam) (*client.EACLWithSignature, error) {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := cp.client.GetEACL(ctx, cid, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) SetEACLParam(ctx context.Context, table *eacl.Table, callParam CallParam) error {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = cp.client.SetEACL(ctx, table, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) AnnounceContainerUsedSpaceParam(ctx context.Context, announce []container.UsedSpaceAnnouncement, callParam CallParam) error {
|
||||||
|
cp, options, err := p.connParam(ctx, callParam)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = cp.client.AnnounceContainerUsedSpace(ctx, announce, options...)
|
||||||
|
p.checkSessionTokenErr(err, cp.address)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
|
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
|
||||||
conn, _, err := p.Connection()
|
conn, _, err := p.Connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -307,6 +307,10 @@ func TestTwoFailed(t *testing.T) {
|
||||||
require.Contains(t, err.Error(), "no healthy")
|
require.Contains(t, err.Error(), "no healthy")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSessionCache(t *testing.T) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func newToken(t *testing.T) *session.Token {
|
func newToken(t *testing.T) *session.Token {
|
||||||
tok := session.NewToken()
|
tok := session.NewToken()
|
||||||
uid, err := uuid.New().MarshalBinary()
|
uid, err := uuid.New().MarshalBinary()
|
||||||
|
@ -324,12 +328,17 @@ func TestWaitPresence(t *testing.T) {
|
||||||
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
p := &pool{
|
p := &pool{
|
||||||
sampler: NewSampler([]float64{1}, rand.NewSource(0)),
|
sampler: NewSampler([]float64{1}, rand.NewSource(0)),
|
||||||
clientPacks: []*clientPack{{
|
clientPacks: []*clientPack{{
|
||||||
client: mockClient,
|
client: mockClient,
|
||||||
healthy: true,
|
healthy: true,
|
||||||
}},
|
}},
|
||||||
|
key: &key.PrivateKey,
|
||||||
|
cache: NewCache(),
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("context canceled", func(t *testing.T) {
|
t.Run("context canceled", func(t *testing.T) {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -73,11 +74,16 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
buffer = make([]float64, len(weights))
|
buffer = make([]float64, len(weights))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
p := &pool{
|
p := &pool{
|
||||||
sampler: NewSampler(weights, rand.NewSource(0)),
|
sampler: NewSampler(weights, rand.NewSource(0)),
|
||||||
clientPacks: []*clientPack{
|
clientPacks: []*clientPack{
|
||||||
{client: newNetmapMock(names[0], true), healthy: true},
|
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
|
||||||
{client: newNetmapMock(names[1], false), healthy: true}},
|
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
|
||||||
|
cache: NewCache(),
|
||||||
|
key: &key.PrivateKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
// check getting first node connection before rebalance happened
|
// check getting first node connection before rebalance happened
|
||||||
|
@ -105,8 +111,6 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mock0 = connection0.(clientMock)
|
mock0 = connection0.(clientMock)
|
||||||
require.Equal(t, names[0], mock0.name)
|
require.Equal(t, names[0], mock0.name)
|
||||||
|
|
||||||
require.NotNil(t, p.clientPacks[0].sessionToken)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthyNoReweight(t *testing.T) {
|
func TestHealthyNoReweight(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue