diff --git a/pool/pool.go b/pool/pool.go index 24d7a123..d2dab671 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -9,6 +9,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "sync" "time" @@ -38,16 +39,26 @@ type BuilderOptions struct { NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration SessionExpirationEpoch uint64 - weights []float64 - addresses []string + nodesParams []*NodesParam clientBuilder func(opts ...client.Option) (client.Client, error) } +type NodesParam struct { + priority int + addresses []string + weights []float64 +} + +type NodeParam struct { + priority int + address string + weight float64 +} + // Builder is an interim structure used to collect node addresses/weights and // build connection pool subsequently. type Builder struct { - addresses []string - weights []float64 + nodeParams []NodeParam } // ContainerPollingParams contains parameters used in polling is a container created or not. @@ -65,20 +76,40 @@ func DefaultPollingParams() *ContainerPollingParams { } // AddNode adds address/weight pair to node PoolBuilder list. -func (pb *Builder) AddNode(address string, weight float64) *Builder { - pb.addresses = append(pb.addresses, address) - pb.weights = append(pb.weights, weight) +func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder { + pb.nodeParams = append(pb.nodeParams, NodeParam{ + address: address, + priority: priority, + weight: weight, + }) return pb } // Build creates new pool based on current PoolBuilder state and options. func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) { - if len(pb.addresses) == 0 { + if len(pb.nodeParams) == 0 { return nil, errors.New("no NeoFS peers configured") } - options.weights = adjustWeights(pb.weights) - options.addresses = pb.addresses + nodesParams := make(map[int]*NodesParam) + for _, param := range pb.nodeParams { + nodes, ok := nodesParams[param.priority] + if !ok { + nodes = &NodesParam{priority: param.priority} + } + nodes.addresses = append(nodes.addresses, param.address) + nodes.weights = append(nodes.weights, param.weight) + nodesParams[param.priority] = nodes + } + + for _, nodes := range nodesParams { + nodes.weights = adjustWeights(nodes.weights) + options.nodesParams = append(options.nodesParams, nodes) + } + + sort.Slice(options.nodesParams, func(i, j int) bool { + return options.nodesParams[i].priority < options.nodesParams[j].priority + }) if options.clientBuilder == nil { options.clientBuilder = client.New @@ -169,14 +200,18 @@ func cfgFromOpts(opts ...CallOption) *callConfig { var _ Pool = (*pool)(nil) type pool struct { + innerPools []*innerPool + key *ecdsa.PrivateKey + owner *owner.ID + cancel context.CancelFunc + closedCh chan struct{} + cache *SessionCache +} + +type innerPool struct { lock sync.RWMutex sampler *Sampler - key *ecdsa.PrivateKey - owner *owner.ID clientPacks []*clientPack - cancel context.CancelFunc - closedCh chan struct{} - cache *SessionCache } func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { @@ -192,47 +227,51 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { ownerID := owner.NewIDFromNeo3Wallet(wallet) - clientPacks := make([]*clientPack, len(options.weights)) + inner := make([]*innerPool, len(options.nodesParams)) var atLeastOneHealthy bool - for i, address := range options.addresses { - c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), - client.WithURIAddress(address, nil), - client.WithDialTimeout(options.NodeConnectionTimeout)) - if err != nil { - return nil, err + for i, params := range options.nodesParams { + clientPacks := make([]*clientPack, len(params.weights)) + for j, address := range params.addresses { + c, err := options.clientBuilder(client.WithDefaultPrivateKey(options.Key), + client.WithURIAddress(address, nil), + client.WithDialTimeout(options.NodeConnectionTimeout)) + if err != nil { + return nil, err + } + var healthy bool + cliRes, err := c.CreateSession(ctx, options.SessionExpirationEpoch) + if err != nil && options.Logger != nil { + options.Logger.Warn("failed to create neofs session token for client", + zap.String("address", address), + zap.Error(err)) + } else if err == nil { + healthy, atLeastOneHealthy = true, true + st := sessionTokenForOwner(ownerID, cliRes) + _ = cache.Put(formCacheKey(address, options.Key), st) + } + clientPacks[j] = &clientPack{client: c, healthy: healthy, address: address} } - var healthy bool - cliRes, err := c.CreateSession(ctx, options.SessionExpirationEpoch) - if err != nil && options.Logger != nil { - options.Logger.Warn("failed to create neofs session token for client", - zap.String("address", address), - zap.Error(err)) - } else if err == nil { - healthy, atLeastOneHealthy = true, true + source := rand.NewSource(time.Now().UnixNano()) + sampler := NewSampler(params.weights, source) - st := sessionTokenForOwner(ownerID, cliRes) - - _ = cache.Put(formCacheKey(address, options.Key), st) + inner[i] = &innerPool{ + sampler: sampler, + clientPacks: clientPacks, } - clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address} } if !atLeastOneHealthy { return nil, fmt.Errorf("at least one node must be healthy") } - source := rand.NewSource(time.Now().UnixNano()) - sampler := NewSampler(options.weights, source) - ctx, cancel := context.WithCancel(ctx) pool := &pool{ - sampler: sampler, - key: options.Key, - owner: ownerID, - clientPacks: clientPacks, - cancel: cancel, - closedCh: make(chan struct{}), - cache: cache, + innerPools: inner, + key: options.Key, + owner: ownerID, + cancel: cancel, + closedCh: make(chan struct{}), + cache: cache, } go startRebalance(ctx, pool, options) return pool, nil @@ -240,7 +279,10 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { ticker := time.NewTimer(options.ClientRebalanceInterval) - buffer := make([]float64, len(options.weights)) + buffers := make([][]float64, len(options.nodesParams)) + for i, params := range options.nodesParams { + buffers[i] = make([]float64, len(params.weights)) + } for { select { @@ -248,57 +290,72 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { close(p.closedCh) return case <-ticker.C: - updateNodesHealth(ctx, p, options, buffer) + updateNodesHealth(ctx, p, options, buffers) ticker.Reset(options.ClientRebalanceInterval) } } } -func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bufferWeights []float64) { - if len(bufferWeights) != len(p.clientPacks) { - bufferWeights = make([]float64, len(p.clientPacks)) - } - healthyChanged := false +func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) { wg := sync.WaitGroup{} - for i, cPack := range p.clientPacks { + for i, inner := range p.innerPools { wg.Add(1) - go func(i int, client client.Client) { + bufferWeights := buffers[i] + go func(i int, innerPool *innerPool) { + defer wg.Done() + updateInnerNodesHealth(ctx, p, i, options, bufferWeights) + }(i, inner) + } + wg.Wait() +} + +func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) { + if i > len(pool.innerPools)-1 { + return + } + p := pool.innerPools[i] + + healthyChanged := false + wg := sync.WaitGroup{} + for j, cPack := range p.clientPacks { + wg.Add(1) + go func(j int, client client.Client) { defer wg.Done() ok := true tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) defer c() if _, err := client.EndpointInfo(tctx); err != nil { ok = false - bufferWeights[i] = 0 + bufferWeights[j] = 0 } p.lock.RLock() - cp := *p.clientPacks[i] + cp := *p.clientPacks[j] p.lock.RUnlock() if ok { - bufferWeights[i] = options.weights[i] + bufferWeights[j] = options.nodesParams[i].weights[j] if !cp.healthy { if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil { ok = false - bufferWeights[i] = 0 + bufferWeights[j] = 0 } else { - tkn := p.newSessionToken(cliRes) + tkn := pool.newSessionToken(cliRes) - _ = p.cache.Put(formCacheKey(cp.address, p.key), tkn) + _ = pool.cache.Put(formCacheKey(cp.address, pool.key), tkn) } } } else { - p.cache.DeleteByPrefix(cp.address) + pool.cache.DeleteByPrefix(cp.address) } p.lock.Lock() - if p.clientPacks[i].healthy != ok { - p.clientPacks[i].healthy = ok + if p.clientPacks[j].healthy != ok { + p.clientPacks[j].healthy = ok healthyChanged = true } p.lock.Unlock() - }(i, cPack.client) + }(j, cPack.client) } wg.Wait() @@ -337,6 +394,17 @@ func (p *pool) Connection() (client.Client, *session.Token, error) { } func (p *pool) connection() (*clientPack, error) { + for _, inner := range p.innerPools { + cp, err := inner.connection() + if err == nil { + return cp, nil + } + } + + return nil, errors.New("no healthy client") +} + +func (p *innerPool) connection() (*clientPack, error) { p.lock.RLock() defer p.lock.RUnlock() if len(p.clientPacks) == 1 { @@ -353,6 +421,7 @@ func (p *pool) connection() (*clientPack, error) { return cp, nil } } + return nil, errors.New("no healthy client") } @@ -843,7 +912,7 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa } } -// Cloce closes the pool and releases all the associated resources. +// Close closes the pool and releases all the associated resources. func (p *pool) Close() { p.cancel() <-p.closedCh diff --git a/pool/pool_test.go b/pool/pool_test.go index 23d826c1..bd8bc375 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -27,7 +27,7 @@ func TestBuildPoolClientFailed(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) + pb.AddNode("peer0", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -54,7 +54,7 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) + pb.AddNode("peer0", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -108,8 +108,8 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 9) - pb.AddNode("peer1", 1) + pb.AddNode("peer0", 9, 1) + pb.AddNode("peer1", 1, 1) log, err := zap.NewProduction() require.NoError(t, err) @@ -159,7 +159,7 @@ func TestOneNode(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) + pb.AddNode("peer0", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -196,8 +196,8 @@ func TestTwoNodes(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) - pb.AddNode("peer1", 1) + pb.AddNode("peer0", 1, 1) + pb.AddNode("peer1", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -248,8 +248,8 @@ func TestOneOfTwoFailed(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) - pb.AddNode("peer1", 9) + pb.AddNode("peer0", 1, 1) + pb.AddNode("peer1", 9, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -283,8 +283,8 @@ func TestTwoFailed(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) - pb.AddNode("peer1", 1) + pb.AddNode("peer0", 1, 1) + pb.AddNode("peer1", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -327,7 +327,7 @@ func TestSessionCache(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) + pb.AddNode("peer0", 1, 1) opts := &BuilderOptions{ Key: newPrivateKey(t), @@ -340,6 +340,7 @@ func TestSessionCache(t *testing.T) { pool, err := pb.Build(ctx, opts) require.NoError(t, err) + t.Cleanup(pool.Close) // cache must contain session token _, st, err := pool.Connection() @@ -363,6 +364,71 @@ func TestSessionCache(t *testing.T) { require.Contains(t, tokens, st) } +func TestPriority(t *testing.T) { + t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85 + + ctrl := gomock.NewController(t) + ctrl2 := gomock.NewController(t) + + var tokens []*session.Token + clientCount := -1 + clientBuilder := func(opts ...client.Option) (client.Client, error) { + clientCount++ + mockClient := NewMockClient(ctrl) + mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { + tok := newToken(t) + tokens = append(tokens, tok) + return tok, nil + }).AnyTimes() + mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() + + mockClient2 := NewMockClient(ctrl2) + mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { + tok := newToken(t) + tokens = append(tokens, tok) + return tok, nil + }).AnyTimes() + mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + if clientCount == 0 { + return mockClient, nil + } + return mockClient2, nil + } + + pb := new(Builder) + pb.AddNode("peer0", 1, 1) + pb.AddNode("peer1", 2, 100) + + opts := &BuilderOptions{ + Key: newPrivateKey(t), + clientBuilder: clientBuilder, + ClientRebalanceInterval: 1500 * time.Millisecond, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pool, err := pb.Build(ctx, opts) + require.NoError(t, err) + t.Cleanup(pool.Close) + + firstNode := func() bool { + _, st, err := pool.Connection() + require.NoError(t, err) + return st == tokens[0] + } + secondNode := func() bool { + _, st, err := pool.Connection() + require.NoError(t, err) + return st == tokens[1] + } + require.Never(t, secondNode, time.Second, 200*time.Millisecond) + + require.Eventually(t, secondNode, time.Second, 200*time.Millisecond) + require.Never(t, firstNode, time.Second, 200*time.Millisecond) +} + func TestSessionCacheWithKey(t *testing.T) { t.Skip("NeoFS API client can't be mocked") // neofs-sdk-go#85 @@ -386,11 +452,12 @@ func TestSessionCacheWithKey(t *testing.T) { } pb := new(Builder) - pb.AddNode("peer0", 1) + pb.AddNode("peer0", 1, 1) opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + Key: newPrivateKey(t), + clientBuilder: clientBuilder, + ClientRebalanceInterval: 30 * time.Second, } ctx, cancel := context.WithCancel(context.Background()) @@ -431,14 +498,18 @@ func TestWaitPresence(t *testing.T) { cache, err := NewCache() require.NoError(t, err) - p := &pool{ + inner := &innerPool{ sampler: NewSampler([]float64{1}, rand.NewSource(0)), clientPacks: []*clientPack{{ client: mockClient, healthy: true, }}, - key: newPrivateKey(t), - cache: cache, + } + + p := &pool{ + innerPools: []*innerPool{inner}, + key: newPrivateKey(t), + cache: cache, } t.Run("context canceled", func(t *testing.T) { diff --git a/pool/sampler_test.go b/pool/sampler_test.go index eaca11ed..dce8dd38 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -62,20 +62,23 @@ func TestHealthyReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = &BuilderOptions{weights: weights} + options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) ) cache, err := NewCache() require.NoError(t, err) - p := &pool{ + inner := &innerPool{ sampler: NewSampler(weights, rand.NewSource(0)), clientPacks: []*clientPack{ {client: newNetmapMock(names[0], true), healthy: true, address: "address0"}, {client: newNetmapMock(names[1], false), healthy: true, address: "address1"}}, - cache: cache, - key: newPrivateKey(t), + } + p := &pool{ + innerPools: []*innerPool{inner}, + cache: cache, + key: newPrivateKey(t), } // check getting first node connection before rebalance happened @@ -84,7 +87,7 @@ func TestHealthyReweight(t *testing.T) { mock0 := connection0.(clientMock) require.Equal(t, names[0], mock0.name) - updateNodesHealth(context.TODO(), p, options, buffer) + updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) connection1, _, err := p.Connection() require.NoError(t, err) @@ -92,12 +95,12 @@ func TestHealthyReweight(t *testing.T) { require.Equal(t, names[1], mock1.name) // enabled first node again - p.lock.Lock() - p.clientPacks[0].client = newNetmapMock(names[0], false) - p.lock.Unlock() + inner.lock.Lock() + inner.clientPacks[0].client = newNetmapMock(names[0], false) + inner.lock.Unlock() - updateNodesHealth(context.TODO(), p, options, buffer) - p.sampler = NewSampler(weights, rand.NewSource(0)) + updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) + inner.sampler = NewSampler(weights, rand.NewSource(0)) connection0, _, err = p.Connection() require.NoError(t, err) @@ -111,21 +114,24 @@ func TestHealthyNoReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = &BuilderOptions{weights: weights} + options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) ) sampler := NewSampler(weights, rand.NewSource(0)) - p := &pool{ + inner := &innerPool{ sampler: sampler, clientPacks: []*clientPack{ {client: newNetmapMock(names[0], false), healthy: true}, {client: newNetmapMock(names[1], false), healthy: true}}, } + p := &pool{ + innerPools: []*innerPool{inner}, + } - updateNodesHealth(context.TODO(), p, options, buffer) + updateInnerNodesHealth(context.TODO(), p, 0, options, buffer) - p.lock.RLock() - defer p.lock.RUnlock() - require.Truef(t, sampler == p.sampler, "Sampler must not be changed. Expected: %p, actual: %p", sampler, p.sampler) + inner.lock.RLock() + defer inner.lock.RUnlock() + require.Equal(t, inner.sampler, sampler) }