From a44551d42b657949fa0dec58f44db26e423966d6 Mon Sep 17 00:00:00 2001 From: Pavel Korotkov Date: Mon, 5 Apr 2021 20:10:03 +0300 Subject: [PATCH] Add connection pool implementation (part 2) Signed-off-by: Pavel Korotkov --- .test.env | 4 +-- connections/generator.go | 8 ++--- connections/pool.go | 72 ++++++++++++++++++++++++++++++++-------- global/context.go | 7 ++-- health._go_ | 33 ------------------ neofs/client-plant.go | 3 ++ 6 files changed, 70 insertions(+), 57 deletions(-) delete mode 100644 health._go_ diff --git a/.test.env b/.test.env index 840ef5b..6eaea9e 100644 --- a/.test.env +++ b/.test.env @@ -9,7 +9,7 @@ HTTP_GW_KEEPALIVE_TIMEOUT=300s HTTP_GW_KEEPALIVE_TIME=120s HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=True HTTP_GW_CONN_TTL=1h -HTTP_GW_PEERS_0_WEIGHT=0.6 +HTTP_GW_PEERS_0_WEIGHT=0.4 HTTP_GW_PEERS_0_ADDRESS=s01.neofs.devenv:8080 -HTTP_GW_PEERS_1_WEIGHT=0.4 +HTTP_GW_PEERS_1_WEIGHT=0.6 HTTP_GW_PEERS_1_ADDRESS=s02.neofs.devenv:8080 \ No newline at end of file diff --git a/connections/generator.go b/connections/generator.go index a03bd26..435386a 100644 --- a/connections/generator.go +++ b/connections/generator.go @@ -3,7 +3,7 @@ package connections import "math/rand" // https://www.keithschwarz.com/darts-dice-coins/ -type Generator struct { +type Sampler struct { randomGenerator *rand.Rand probabilities []float64 alias []int @@ -22,8 +22,8 @@ func (wl *workList) pop() int { return n } -func NewGenerator(probabilities []float64, source rand.Source) *Generator { - generator := &Generator{} +func NewSampler(probabilities []float64, source rand.Source) *Sampler { + generator := &Sampler{} var ( small workList large workList @@ -66,7 +66,7 @@ func NewGenerator(probabilities []float64, source rand.Source) *Generator { return generator } -func (g *Generator) Next() int { +func (g *Sampler) Next() int { n := len(g.alias) i := g.randomGenerator.Intn(n) if g.randomGenerator.Float64() < g.probabilities[i] { diff --git a/connections/pool.go b/connections/pool.go index 5fefbf0..48c859b 100644 --- a/connections/pool.go +++ b/connections/pool.go @@ -6,6 +6,7 @@ import ( "errors" "math" "math/rand" + "sync" "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -17,6 +18,8 @@ type PoolBuilderOptions struct { NodeConnectionTimeout time.Duration NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration + weights []float64 + connections []*grpc.ClientConn } type PoolBuilder struct { @@ -50,7 +53,9 @@ func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) ( } cons[i] = con } - return new(pb.weights, options.Key, cons) + options.weights = pb.weights + options.connections = cons + return new(ctx, options) } type Pool interface { @@ -58,29 +63,70 @@ type Pool interface { } type pool struct { - generator *Generator - clients []client.Client + lock sync.RWMutex + sampler *Sampler + clients []client.Client + healthy []bool } -func new(weights []float64, key *ecdsa.PrivateKey, connections []*grpc.ClientConn) (Pool, error) { - clients := make([]client.Client, len(weights)) - for i, con := range connections { - c, err := client.New(client.WithDefaultPrivateKey(key), client.WithGRPCConnection(con)) +func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { + n := len(options.weights) + clients := make([]client.Client, n) + healthy := make([]bool, n) + for i, con := range options.connections { + c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con)) if err != nil { return nil, err } clients[i] = c + healthy[i] = true } source := rand.NewSource(time.Now().UnixNano()) - return &pool{ - generator: NewGenerator(weights, source), - clients: clients, - }, nil + pool := &pool{ + sampler: NewSampler(options.weights, source), + clients: clients, + healthy: healthy, + } + go func() { + ticker := time.NewTimer(options.ClientRebalanceInterval) + for range ticker.C { + ok := true + for i, client := range pool.clients { + func() { + tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) + defer c() + if _, err := client.EndpointInfo(tctx); err != nil { + ok = false + } + pool.lock.Lock() + pool.healthy[i] = ok + pool.lock.Unlock() + }() + } + ticker.Reset(options.ClientRebalanceInterval) + } + }() + return pool, nil } func (p *pool) Client() client.Client { + p.lock.RLock() + defer p.lock.RUnlock() if len(p.clients) == 1 { - return p.clients[0] + if p.healthy[0] { + return p.clients[0] + } + return nil } - return p.clients[p.generator.Next()] + var i *int = nil + for k := 0; k < 10; k++ { + i_ := p.sampler.Next() + if p.healthy[i_] { + i = &i_ + } + } + if i != nil { + return p.clients[*i] + } + return nil } diff --git a/global/context.go b/global/context.go index 19b3c53..0fd29cf 100644 --- a/global/context.go +++ b/global/context.go @@ -8,16 +8,13 @@ import ( ) var ( - globalContext context.Context - globalContextOnce sync.Once - globalContextBarrier = make(chan struct{}) + globalContext context.Context + globalContextOnce sync.Once ) func Context() context.Context { globalContextOnce.Do(func() { globalContext, _ = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - close(globalContextBarrier) }) - <-globalContextBarrier return globalContext } diff --git a/health._go_ b/health._go_ deleted file mode 100644 index 719667b..0000000 --- a/health._go_ +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "github.com/fasthttp/router" - "github.com/valyala/fasthttp" -) - -type stater func() error - -const ( - healthyState = "NeoFS HTTP Gateway is " - defaultContentType = "text/plain; charset=utf-8" -) - -func attachHealthy(r *router.Router, e stater) { - r.GET("/-/ready/", func(ctx *fasthttp.RequestCtx) { - ctx.SetStatusCode(fasthttp.StatusOK) - ctx.SetBodyString(healthyState + "ready") - }) - r.GET("/-/healthy/", func(c *fasthttp.RequestCtx) { - code := fasthttp.StatusOK - msg := "healthy" - - if err := e(); err != nil { - msg = "unhealthy: " + err.Error() - code = fasthttp.StatusBadRequest - } - c.Response.Reset() - c.SetStatusCode(code) - c.SetContentType(defaultContentType) - c.SetBodyString(healthyState + msg) - }) -} diff --git a/neofs/client-plant.go b/neofs/client-plant.go index 0078008..1430619 100644 --- a/neofs/client-plant.go +++ b/neofs/client-plant.go @@ -80,6 +80,9 @@ type neofsClientPlant struct { func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) { c := cp.pool.Client() + if c == nil { + return nil, nil, errors.New("failed to peek a healthy node to connect to") + } st, err := c.CreateSession(ctx, math.MaxUint64) if err != nil { return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token")