forked from TrueCloudLab/frostfs-sdk-go
[#165] pool: make private inner structs
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
f5cabe26cb
commit
ec5c223f29
7 changed files with 28 additions and 55 deletions
|
@ -8,7 +8,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SessionCache struct {
|
type sessionCache struct {
|
||||||
cache *lru.Cache
|
cache *lru.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,16 +17,16 @@ type cacheValue struct {
|
||||||
token *session.Token
|
token *session.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCache() (*SessionCache, error) {
|
func newCache() (*sessionCache, error) {
|
||||||
cache, err := lru.New(100)
|
cache, err := lru.New(100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &SessionCache{cache: cache}, nil
|
return &sessionCache{cache: cache}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SessionCache) Get(key string) *session.Token {
|
func (c *sessionCache) Get(key string) *session.Token {
|
||||||
valueRaw, ok := c.cache.Get(key)
|
valueRaw, ok := c.cache.Get(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
|
@ -38,7 +38,7 @@ func (c *SessionCache) Get(key string) *session.Token {
|
||||||
return value.token
|
return value.token
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SessionCache) GetAccessTime(key string) (time.Time, bool) {
|
func (c *sessionCache) GetAccessTime(key string) (time.Time, bool) {
|
||||||
valueRaw, ok := c.cache.Peek(key)
|
valueRaw, ok := c.cache.Peek(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return time.Time{}, false
|
return time.Time{}, false
|
||||||
|
@ -47,14 +47,14 @@ func (c *SessionCache) GetAccessTime(key string) (time.Time, bool) {
|
||||||
return valueRaw.(*cacheValue).atime, true
|
return valueRaw.(*cacheValue).atime, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SessionCache) Put(key string, token *session.Token) bool {
|
func (c *sessionCache) Put(key string, token *session.Token) bool {
|
||||||
return c.cache.Add(key, &cacheValue{
|
return c.cache.Add(key, &cacheValue{
|
||||||
atime: time.Now(),
|
atime: time.Now(),
|
||||||
token: token,
|
token: token,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SessionCache) DeleteByPrefix(prefix string) {
|
func (c *sessionCache) DeleteByPrefix(prefix string) {
|
||||||
for _, key := range c.cache.Keys() {
|
for _, key := range c.cache.Keys() {
|
||||||
if strings.HasPrefix(key.(string), prefix) {
|
if strings.HasPrefix(key.(string), prefix) {
|
||||||
c.cache.Remove(key)
|
c.cache.Remove(key)
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
func TestSessionCache_GetAccessTime(t *testing.T) {
|
func TestSessionCache_GetAccessTime(t *testing.T) {
|
||||||
const key = "Foo"
|
const key = "Foo"
|
||||||
|
|
||||||
cache, err := NewCache()
|
cache, err := newCache()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
cache.Put(key, nil)
|
cache.Put(key, nil)
|
||||||
|
|
27
pool/key.go
27
pool/key.go
|
@ -1,27 +0,0 @@
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"crypto/elliptic"
|
|
||||||
"crypto/rand"
|
|
||||||
"math/big"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewEphemeralKey creates new private key used for NeoFS session. It'll be
|
|
||||||
// removed after refactoring to use neo-go crypto library.
|
|
||||||
func NewEphemeralKey() (*ecdsa.PrivateKey, error) {
|
|
||||||
c := elliptic.P256()
|
|
||||||
priv, x, y, err := elliptic.GenerateKey(c, rand.Reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
key := &ecdsa.PrivateKey{
|
|
||||||
PublicKey: ecdsa.PublicKey{
|
|
||||||
Curve: c,
|
|
||||||
X: x,
|
|
||||||
Y: y,
|
|
||||||
},
|
|
||||||
D: new(big.Int).SetBytes(priv),
|
|
||||||
}
|
|
||||||
return key, nil
|
|
||||||
}
|
|
10
pool/pool.go
10
pool/pool.go
|
@ -165,14 +165,14 @@ type Pool struct {
|
||||||
owner *owner.ID
|
owner *owner.ID
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
closedCh chan struct{}
|
closedCh chan struct{}
|
||||||
cache *SessionCache
|
cache *sessionCache
|
||||||
stokenDuration uint64
|
stokenDuration uint64
|
||||||
stokenThreshold time.Duration
|
stokenThreshold time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type innerPool struct {
|
type innerPool struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
sampler *Sampler
|
sampler *sampler
|
||||||
clientPacks []*clientPack
|
clientPacks []*clientPack
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,7 +197,7 @@ func NewPool(ctx context.Context, options InitParameters) (*Pool, error) {
|
||||||
|
|
||||||
fillDefaultInitParams(&options)
|
fillDefaultInitParams(&options)
|
||||||
|
|
||||||
cache, err := NewCache()
|
cache, err := newCache()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -228,7 +228,7 @@ func NewPool(ctx context.Context, options InitParameters) (*Pool, error) {
|
||||||
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
|
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
|
||||||
}
|
}
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
source := rand.NewSource(time.Now().UnixNano())
|
||||||
sampler := NewSampler(params.weights, source)
|
sampler := newSampler(params.weights, source)
|
||||||
|
|
||||||
inner[i] = &innerPool{
|
inner[i] = &innerPool{
|
||||||
sampler: sampler,
|
sampler: sampler,
|
||||||
|
@ -419,7 +419,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options reba
|
||||||
probabilities := adjustWeights(bufferWeights)
|
probabilities := adjustWeights(bufferWeights)
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
source := rand.NewSource(time.Now().UnixNano())
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.sampler = NewSampler(probabilities, source)
|
p.sampler = newSampler(probabilities, source)
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -536,11 +536,11 @@ func TestWaitPresence(t *testing.T) {
|
||||||
mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.ResNetworkInfo{}, nil).AnyTimes()
|
mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.ResNetworkInfo{}, nil).AnyTimes()
|
||||||
mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
|
|
||||||
cache, err := NewCache()
|
cache, err := newCache()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
inner := &innerPool{
|
inner := &innerPool{
|
||||||
sampler: NewSampler([]float64{1}, rand.NewSource(0)),
|
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
||||||
clientPacks: []*clientPack{{
|
clientPacks: []*clientPack{{
|
||||||
client: mockClient,
|
client: mockClient,
|
||||||
healthy: true,
|
healthy: true,
|
||||||
|
|
|
@ -2,19 +2,19 @@ package pool
|
||||||
|
|
||||||
import "math/rand"
|
import "math/rand"
|
||||||
|
|
||||||
// Sampler implements weighted random number generation using Vose's Alias
|
// sampler implements weighted random number generation using Vose's Alias
|
||||||
// Method (https://www.keithschwarz.com/darts-dice-coins/).
|
// Method (https://www.keithschwarz.com/darts-dice-coins/).
|
||||||
type Sampler struct {
|
type sampler struct {
|
||||||
randomGenerator *rand.Rand
|
randomGenerator *rand.Rand
|
||||||
probabilities []float64
|
probabilities []float64
|
||||||
alias []int
|
alias []int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSampler creates new Sampler with a given set of probabilities using
|
// newSampler creates new sampler with a given set of probabilities using
|
||||||
// given source of randomness. Created Sampler will produce numbers from
|
// given source of randomness. Created sampler will produce numbers from
|
||||||
// 0 to len(probabilities).
|
// 0 to len(probabilities).
|
||||||
func NewSampler(probabilities []float64, source rand.Source) *Sampler {
|
func newSampler(probabilities []float64, source rand.Source) *sampler {
|
||||||
sampler := &Sampler{}
|
sampler := &sampler{}
|
||||||
var (
|
var (
|
||||||
small workList
|
small workList
|
||||||
large workList
|
large workList
|
||||||
|
@ -57,8 +57,8 @@ func NewSampler(probabilities []float64, source rand.Source) *Sampler {
|
||||||
return sampler
|
return sampler
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next (not so) random number from Sampler.
|
// Next returns the next (not so) random number from sampler.
|
||||||
func (g *Sampler) Next() int {
|
func (g *sampler) Next() int {
|
||||||
n := len(g.alias)
|
n := len(g.alias)
|
||||||
i := g.randomGenerator.Intn(n)
|
i := g.randomGenerator.Intn(n)
|
||||||
if g.randomGenerator.Float64() < g.probabilities[i] {
|
if g.randomGenerator.Float64() < g.probabilities[i] {
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestSamplerStability(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
sampler := NewSampler(tc.probabilities, rand.NewSource(0))
|
sampler := newSampler(tc.probabilities, rand.NewSource(0))
|
||||||
res := make([]int, len(tc.probabilities))
|
res := make([]int, len(tc.probabilities))
|
||||||
for i := 0; i < COUNT; i++ {
|
for i := 0; i < COUNT; i++ {
|
||||||
res[sampler.Next()]++
|
res[sampler.Next()]++
|
||||||
|
@ -72,11 +72,11 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
buffer = make([]float64, len(weights))
|
buffer = make([]float64, len(weights))
|
||||||
)
|
)
|
||||||
|
|
||||||
cache, err := NewCache()
|
cache, err := newCache()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
inner := &innerPool{
|
inner := &innerPool{
|
||||||
sampler: NewSampler(weights, rand.NewSource(0)),
|
sampler: newSampler(weights, rand.NewSource(0)),
|
||||||
clientPacks: []*clientPack{
|
clientPacks: []*clientPack{
|
||||||
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
|
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
|
||||||
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
|
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
|
||||||
|
@ -106,7 +106,7 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
inner.lock.Unlock()
|
inner.lock.Unlock()
|
||||||
|
|
||||||
updateInnerNodesHealth(context.TODO(), p, 0, options, buffer)
|
updateInnerNodesHealth(context.TODO(), p, 0, options, buffer)
|
||||||
inner.sampler = NewSampler(weights, rand.NewSource(0))
|
inner.sampler = newSampler(weights, rand.NewSource(0))
|
||||||
|
|
||||||
connection0, _, err = p.Connection()
|
connection0, _, err = p.Connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -122,7 +122,7 @@ func TestHealthyNoReweight(t *testing.T) {
|
||||||
buffer = make([]float64, len(weights))
|
buffer = make([]float64, len(weights))
|
||||||
)
|
)
|
||||||
|
|
||||||
sampler := NewSampler(weights, rand.NewSource(0))
|
sampler := newSampler(weights, rand.NewSource(0))
|
||||||
inner := &innerPool{
|
inner := &innerPool{
|
||||||
sampler: sampler,
|
sampler: sampler,
|
||||||
clientPacks: []*clientPack{
|
clientPacks: []*clientPack{
|
||||||
|
|
Loading…
Reference in a new issue