[#74] Add priority to nodes

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-11-16 16:50:33 +03:00 committed by Alex Vanin
parent 279a5a1e0b
commit 618eeb8172
3 changed files with 244 additions and 98 deletions

View file

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -38,16 +39,26 @@ type BuilderOptions struct {
NodeRequestTimeout time.Duration NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration ClientRebalanceInterval time.Duration
SessionExpirationEpoch uint64 SessionExpirationEpoch uint64
weights []float64 nodesParams []*NodesParam
addresses []string
clientBuilder func(opts ...client.Option) (client.Client, error) clientBuilder func(opts ...client.Option) (client.Client, error)
} }
type NodesParam struct {
priority int
addresses []string
weights []float64
}
type NodeParam struct {
priority int
address string
weight float64
}
// Builder is an interim structure used to collect node addresses/weights and // Builder is an interim structure used to collect node addresses/weights and
// build connection pool subsequently. // build connection pool subsequently.
type Builder struct { type Builder struct {
addresses []string nodeParams []NodeParam
weights []float64
} }
// ContainerPollingParams contains parameters used in polling is a container created or not. // ContainerPollingParams contains parameters used in polling is a container created or not.
@ -65,20 +76,40 @@ func DefaultPollingParams() *ContainerPollingParams {
} }
// AddNode adds address/weight pair to node PoolBuilder list. // AddNode adds address/weight pair to node PoolBuilder list.
func (pb *Builder) AddNode(address string, weight float64) *Builder { func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder {
pb.addresses = append(pb.addresses, address) pb.nodeParams = append(pb.nodeParams, NodeParam{
pb.weights = append(pb.weights, weight) address: address,
priority: priority,
weight: weight,
})
return pb return pb
} }
// Build creates new pool based on current PoolBuilder state and options. // Build creates new pool based on current PoolBuilder state and options.
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) { func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) {
if len(pb.addresses) == 0 { if len(pb.nodeParams) == 0 {
return nil, errors.New("no NeoFS peers configured") return nil, errors.New("no NeoFS peers configured")
} }
options.weights = adjustWeights(pb.weights) nodesParams := make(map[int]*NodesParam)
options.addresses = pb.addresses for _, param := range pb.nodeParams {
nodes, ok := nodesParams[param.priority]
if !ok {
nodes = &NodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParams[param.priority] = nodes
}
for _, nodes := range nodesParams {
nodes.weights = adjustWeights(nodes.weights)
options.nodesParams = append(options.nodesParams, nodes)
}
sort.Slice(options.nodesParams, func(i, j int) bool {
return options.nodesParams[i].priority < options.nodesParams[j].priority
})
if options.clientBuilder == nil { if options.clientBuilder == nil {
options.clientBuilder = client.New options.clientBuilder = client.New
@ -169,16 +200,20 @@ func cfgFromOpts(opts ...CallOption) *callConfig {
var _ Pool = (*pool)(nil) var _ Pool = (*pool)(nil)
type pool struct { type pool struct {
lock sync.RWMutex innerPools []*innerPool
sampler *Sampler
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
owner *owner.ID owner *owner.ID
clientPacks []*clientPack
cancel context.CancelFunc cancel context.CancelFunc
closedCh chan struct{} closedCh chan struct{}
cache *SessionCache cache *SessionCache
} }
type innerPool struct {
lock sync.RWMutex
sampler *Sampler
clientPacks []*clientPack
}
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey) wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
if err != nil { if err != nil {
@ -192,9 +227,11 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
ownerID := owner.NewIDFromNeo3Wallet(wallet) ownerID := owner.NewIDFromNeo3Wallet(wallet)
clientPacks := make([]*clientPack, len(options.weights)) inner := make([]*innerPool, len(options.nodesParams))
var atLeastOneHealthy bool var atLeastOneHealthy bool
for i, address := range options.addresses { for i, params := range options.nodesParams {
clientPacks := make([]*clientPack, len(params.weights))
for j, address := range params.addresses {
c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key),
client.WithURIAddress(address, nil), client.WithURIAddress(address, nil),
client.WithDialTimeout(options.NodeConnectionTimeout)) client.WithDialTimeout(options.NodeConnectionTimeout))
@ -209,27 +246,29 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
zap.Error(err)) zap.Error(err))
} else if err == nil { } else if err == nil {
healthy, atLeastOneHealthy = true, true healthy, atLeastOneHealthy = true, true
st := sessionTokenForOwner(ownerID, cliRes) st := sessionTokenForOwner(ownerID, cliRes)
_ = cache.Put(formCacheKey(address, options.Key), st) _ = cache.Put(formCacheKey(address, options.Key), st)
} }
clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address} clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address}
}
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampler,
clientPacks: clientPacks,
}
} }
if !atLeastOneHealthy { if !atLeastOneHealthy {
return nil, fmt.Errorf("at least one node must be healthy") return nil, fmt.Errorf("at least one node must be healthy")
} }
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(options.weights, source)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
pool := &pool{ pool := &pool{
sampler: sampler, innerPools: inner,
key: options.Key, key: options.Key,
owner: ownerID, owner: ownerID,
clientPacks: clientPacks,
cancel: cancel, cancel: cancel,
closedCh: make(chan struct{}), closedCh: make(chan struct{}),
cache: cache, cache: cache,
@ -240,7 +279,10 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
ticker := time.NewTimer(options.ClientRebalanceInterval) ticker := time.NewTimer(options.ClientRebalanceInterval)
buffer := make([]float64, len(options.weights)) buffers := make([][]float64, len(options.nodesParams))
for i, params := range options.nodesParams {
buffers[i] = make([]float64, len(params.weights))
}
for { for {
select { select {
@ -248,57 +290,72 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
close(p.closedCh) close(p.closedCh)
return return
case <-ticker.C: case <-ticker.C:
updateNodesHealth(ctx, p, options, buffer) updateNodesHealth(ctx, p, options, buffers)
ticker.Reset(options.ClientRebalanceInterval) ticker.Reset(options.ClientRebalanceInterval)
} }
} }
} }
func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bufferWeights []float64) { func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) {
if len(bufferWeights) != len(p.clientPacks) {
bufferWeights = make([]float64, len(p.clientPacks))
}
healthyChanged := false
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i, cPack := range p.clientPacks { for i, inner := range p.innerPools {
wg.Add(1) wg.Add(1)
go func(i int, client client.Client) { bufferWeights := buffers[i]
go func(i int, innerPool *innerPool) {
defer wg.Done()
updateInnerNodesHealth(ctx, p, i, options, bufferWeights)
}(i, inner)
}
wg.Wait()
}
func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) {
if i > len(pool.innerPools)-1 {
return
}
p := pool.innerPools[i]
healthyChanged := false
wg := sync.WaitGroup{}
for j, cPack := range p.clientPacks {
wg.Add(1)
go func(j int, client client.Client) {
defer wg.Done() defer wg.Done()
ok := true ok := true
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
defer c() defer c()
if _, err := client.EndpointInfo(tctx); err != nil { if _, err := client.EndpointInfo(tctx); err != nil {
ok = false ok = false
bufferWeights[i] = 0 bufferWeights[j] = 0
} }
p.lock.RLock() p.lock.RLock()
cp := *p.clientPacks[i] cp := *p.clientPacks[j]
p.lock.RUnlock() p.lock.RUnlock()
if ok { if ok {
bufferWeights[i] = options.weights[i] bufferWeights[j] = options.nodesParams[i].weights[j]
if !cp.healthy { if !cp.healthy {
if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil { if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
ok = false ok = false
bufferWeights[i] = 0 bufferWeights[j] = 0
} else { } else {
tkn := p.newSessionToken(cliRes) tkn := pool.newSessionToken(cliRes)
_ = p.cache.Put(formCacheKey(cp.address, p.key), tkn) _ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn)
} }
} }
} else { } else {
p.cache.DeleteByPrefix(cp.address) pool.cache.DeleteByPrefix(cp.address)
} }
p.lock.Lock() p.lock.Lock()
if p.clientPacks[i].healthy != ok { if p.clientPacks[j].healthy != ok {
p.clientPacks[i].healthy = ok p.clientPacks[j].healthy = ok
healthyChanged = true healthyChanged = true
} }
p.lock.Unlock() p.lock.Unlock()
}(i, cPack.client) }(j, cPack.client)
} }
wg.Wait() wg.Wait()
@ -337,6 +394,17 @@ func (p *pool) Connection() (client.Client, *session.Token, error) {
} }
func (p *pool) connection() (*clientPack, error) { func (p *pool) connection() (*clientPack, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
return cp, nil
}
}
return nil, errors.New("no healthy client")
}
func (p *innerPool) connection() (*clientPack, error) {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
if len(p.clientPacks) == 1 { if len(p.clientPacks) == 1 {
@ -353,6 +421,7 @@ func (p *pool) connection() (*clientPack, error) {
return cp, nil return cp, nil
} }
} }
return nil, errors.New("no healthy client") return nil, errors.New("no healthy client")
} }
@ -843,7 +912,7 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa
} }
} }
// Cloce closes the pool and releases all the associated resources. // Close closes the pool and releases all the associated resources.
func (p *pool) Close() { func (p *pool) Close() {
p.cancel() p.cancel()
<-p.closedCh <-p.closedCh

View file

@ -27,7 +27,7 @@ func TestBuildPoolClientFailed(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -54,7 +54,7 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -108,8 +108,8 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 9) pb.AddNode("peer0", 9, 1)
pb.AddNode("peer1", 1) pb.AddNode("peer1", 1, 1)
log, err := zap.NewProduction() log, err := zap.NewProduction()
require.NoError(t, err) require.NoError(t, err)
@ -159,7 +159,7 @@ func TestOneNode(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -196,8 +196,8 @@ func TestTwoNodes(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1) pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -248,8 +248,8 @@ func TestOneOfTwoFailed(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 9) pb.AddNode("peer1", 9, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -283,8 +283,8 @@ func TestTwoFailed(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1) pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -327,7 +327,7 @@ func TestSessionCache(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
@ -340,6 +340,7 @@ func TestSessionCache(t *testing.T) {
pool, err := pb.Build(ctx, opts) pool, err := pb.Build(ctx, opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close)
// cache must contain session token // cache must contain session token
_, st, err := pool.Connection() _, st, err := pool.Connection()
@ -363,6 +364,71 @@ func TestSessionCache(t *testing.T) {
require.Contains(t, tokens, st) require.Contains(t, tokens, st)
} }
func TestPriority(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
ctrl := gomock.NewController(t)
ctrl2 := gomock.NewController(t)
var tokens []*session.Token
clientCount := -1
clientBuilder := func(opts ...client.Option) (client.Client, error) {
clientCount++
mockClient := NewMockClient(ctrl)
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes()
mockClient2 := NewMockClient(ctrl2)
mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
tok := newToken(t)
tokens = append(tokens, tok)
return tok, nil
}).AnyTimes()
mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
if clientCount == 0 {
return mockClient, nil
}
return mockClient2, nil
}
pb := new(Builder)
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 2, 100)
opts := &BuilderOptions{
Key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 1500 * time.Millisecond,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pb.Build(ctx, opts)
require.NoError(t, err)
t.Cleanup(pool.Close)
firstNode := func() bool {
_, st, err := pool.Connection()
require.NoError(t, err)
return st == tokens[0]
}
secondNode := func() bool {
_, st, err := pool.Connection()
require.NoError(t, err)
return st == tokens[1]
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
require.Eventually(t, secondNode, time.Second, 200*time.Millisecond)
require.Never(t, firstNode, time.Second, 200*time.Millisecond)
}
func TestSessionCacheWithKey(t *testing.T) { func TestSessionCacheWithKey(t *testing.T) {
t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85 t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85
@ -386,11 +452,12 @@ func TestSessionCacheWithKey(t *testing.T) {
} }
pb := new(Builder) pb := new(Builder)
pb.AddNode("peer0", 1) pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{ opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
ClientRebalanceInterval: 30 * time.Second,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -431,12 +498,16 @@ func TestWaitPresence(t *testing.T) {
cache, err := NewCache() cache, err := NewCache()
require.NoError(t, err) require.NoError(t, err)
p := &pool{ inner := &innerPool{
sampler: NewSampler([]float64{1}, rand.NewSource(0)), sampler: NewSampler([]float64{1}, rand.NewSource(0)),
clientPacks: []*clientPack{{ clientPacks: []*clientPack{{
client: mockClient, client: mockClient,
healthy: true, healthy: true,
}}, }},
}
p := &pool{
innerPools: []*innerPool{inner},
key: newPrivateKey(t), key: newPrivateKey(t),
cache: cache, cache: cache,
} }

View file

@ -62,18 +62,21 @@ func TestHealthyReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = &BuilderOptions{weights: weights} options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )
cache, err := NewCache() cache, err := NewCache()
require.NoError(t, err) require.NoError(t, err)
p := &pool{ inner := &innerPool{
sampler: NewSampler(weights, rand.NewSource(0)), sampler: NewSampler(weights, rand.NewSource(0)),
clientPacks: []*clientPack{ clientPacks: []*clientPack{
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"}, {client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}}, {client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
}
p := &pool{
innerPools: []*innerPool{inner},
cache: cache, cache: cache,
key: newPrivateKey(t), key: newPrivateKey(t),
} }
@ -84,7 +87,7 @@ func TestHealthyReweight(t *testing.T) {
mock0 := connection0.(clientMock) mock0 := connection0.(clientMock)
require.Equal(t, names[0], mock0.name) require.Equal(t, names[0], mock0.name)
updateNodesHealth(context.TODO(), p, options, buffer) updateInnerNodesHealth(context.TODO(), p, 0, options, buffer)
connection1, _, err := p.Connection() connection1, _, err := p.Connection()
require.NoError(t, err) require.NoError(t, err)
@ -92,12 +95,12 @@ func TestHealthyReweight(t *testing.T) {
require.Equal(t, names[1], mock1.name) require.Equal(t, names[1], mock1.name)
// enabled first node again // enabled first node again
p.lock.Lock() inner.lock.Lock()
p.clientPacks[0].client = newNetmapMock(names[0], false) inner.clientPacks[0].client = newNetmapMock(names[0], false)
p.lock.Unlock() inner.lock.Unlock()
updateNodesHealth(context.TODO(), p, options, buffer) updateInnerNodesHealth(context.TODO(), p, 0, options, buffer)
p.sampler = NewSampler(weights, rand.NewSource(0)) inner.sampler = NewSampler(weights, rand.NewSource(0))
connection0, _, err = p.Connection() connection0, _, err = p.Connection()
require.NoError(t, err) require.NoError(t, err)
@ -111,21 +114,24 @@ func TestHealthyNoReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = &BuilderOptions{weights: weights} options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )
sampler := NewSampler(weights, rand.NewSource(0)) sampler := NewSampler(weights, rand.NewSource(0))
p := &pool{ inner := &innerPool{
sampler: sampler, sampler: sampler,
clientPacks: []*clientPack{ clientPacks: []*clientPack{
{client: newNetmapMock(names[0], false), healthy: true}, {client: newNetmapMock(names[0], false), healthy: true},
{client: newNetmapMock(names[1], false), healthy: true}}, {client: newNetmapMock(names[1], false), healthy: true}},
} }
p := &pool{
updateNodesHealth(context.TODO(), p, options, buffer) innerPools: []*innerPool{inner},
}
p.lock.RLock()
defer p.lock.RUnlock() updateInnerNodesHealth(context.TODO(), p, 0, options, buffer)
require.Truef(t, sampler == p.sampler, "Sampler must not be changed. Expected: %p, actual: %p", sampler, p.sampler)
inner.lock.RLock()
defer inner.lock.RUnlock()
require.Equal(t, inner.sampler, sampler)
} }