Add connection pool implementation (part 1)

Signed-off-by: Pavel Korotkov <pavel@nspcc.ru>
This commit is contained in:
Pavel Korotkov 2021-04-05 17:48:01 +03:00 committed by Pavel Korotkov
parent c909c99f72
commit 62a03251ce
9 changed files with 205 additions and 112 deletions

View file

@ -9,5 +9,7 @@ HTTP_GW_KEEPALIVE_TIMEOUT=300s
HTTP_GW_KEEPALIVE_TIME=120s HTTP_GW_KEEPALIVE_TIME=120s
HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=True HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=True
HTTP_GW_CONN_TTL=1h HTTP_GW_CONN_TTL=1h
HTTP_GW_PEERS_0_WEIGHT=1.0 HTTP_GW_PEERS_0_WEIGHT=0.6
HTTP_GW_PEERS_0_ADDRESS=s01.neofs.devenv:8080 HTTP_GW_PEERS_0_ADDRESS=s01.neofs.devenv:8080
HTTP_GW_PEERS_1_WEIGHT=0.4
HTTP_GW_PEERS_1_ADDRESS=s02.neofs.devenv:8080

35
app.go
View file

@ -5,6 +5,7 @@ import (
"strconv" "strconv"
"github.com/fasthttp/router" "github.com/fasthttp/router"
"github.com/nspcc-dev/neofs-http-gate/connections"
"github.com/nspcc-dev/neofs-http-gate/downloader" "github.com/nspcc-dev/neofs-http-gate/downloader"
"github.com/nspcc-dev/neofs-http-gate/logger" "github.com/nspcc-dev/neofs-http-gate/logger"
"github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/neofs"
@ -68,9 +69,6 @@ func newApp(ctx context.Context, opt ...Option) App {
if a.cfg.GetBool(cmdVerbose) { if a.cfg.GetBool(cmdVerbose) {
grpclog.SetLoggerV2(a.auxiliaryLog) grpclog.SetLoggerV2(a.auxiliaryLog)
} }
// conTimeout := a.cfg.GetDuration(cfgConTimeout)
// reqTimeout := a.cfg.GetDuration(cfgReqTimeout)
// tckTimeout := a.cfg.GetDuration(cfgRebalance)
// -- setup FastHTTP server -- // -- setup FastHTTP server --
a.webServer.Name = "neofs-http-gate" a.webServer.Name = "neofs-http-gate"
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
@ -82,29 +80,38 @@ func newApp(ctx context.Context, opt ...Option) App {
a.webServer.NoDefaultContentType = true a.webServer.NoDefaultContentType = true
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize) a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
// -- -- -- -- -- -- FIXME -- -- -- -- -- -- // -- -- -- -- -- -- FIXME -- -- -- -- -- --
// Does not work with StreamRequestBody, // Does not work with StreamRequestBody due to bugs with
// some bugs with readMultipartForm // readMultipartForm, see https://github.com/valyala/fasthttp/issues/968
// https://github.com/valyala/fasthttp/issues/968
a.webServer.DisablePreParseMultipartForm = true a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- -- // -- -- -- -- -- -- -- -- -- -- -- -- -- --
var cl neofs.ConnectionList creds, err := neofs.NewCredentials(a.cfg.GetString(cmdNeoFSKey))
if err != nil {
a.log.Fatal("failed to get neofs credentials", zap.Error(err))
}
pb := new(connections.PoolBuilder)
for i := 0; ; i++ { for i := 0; ; i++ {
address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address") address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address")
weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight") weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight")
if address == "" { if address == "" {
break break
} }
cl.Add(address, weight) pb.AddNode(address, weight)
a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight)) a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight))
} }
creds, err := neofs.NewCredentials(a.cfg.GetString(cmdNeoFSKey)) opts := &connections.PoolBuilderOptions{
if err != nil { Key: creds.PrivateKey(),
a.log.Fatal("could not get neofs credentials", zap.Error(err)) NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout),
NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout),
ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance),
} }
a.plant, err = neofs.NewClientPlant(ctx, cl, creds) pool, err := pb.Build(ctx, opts)
if err != nil { if err != nil {
a.log.Fatal("failed to create neofs client") a.log.Fatal("failed to create connection pool", zap.Error(err))
}
a.plant, err = neofs.NewClientPlant(ctx, pool, creds)
if err != nil {
a.log.Fatal("failed to create neofs client plant")
} }
return a return a
} }
@ -144,8 +151,6 @@ func (a *app) Serve(ctx context.Context) {
a.log.Info("added path /get/{cid}/{oid}") a.log.Info("added path /get/{cid}/{oid}")
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", downloader.DownloadByAttribute) r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", downloader.DownloadByAttribute)
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
// attaching /-/(ready,healthy)
// attachHealthy(r, a.pool.Status)
// enable metrics // enable metrics
if a.cfg.GetBool(cmdMetrics) { if a.cfg.GetBool(cmdMetrics) {
a.log.Info("added path /metrics/") a.log.Info("added path /metrics/")

76
connections/generator.go Normal file
View file

@ -0,0 +1,76 @@
package connections
import "math/rand"
// https://www.keithschwarz.com/darts-dice-coins/
type Generator struct {
randomGenerator *rand.Rand
probabilities []float64
alias []int
}
type workList []int
func (wl *workList) push(e int) {
*wl = append(*wl, e)
}
func (wl *workList) pop() int {
l := len(*wl) - 1
n := (*wl)[l]
*wl = (*wl)[:l]
return n
}
func NewGenerator(probabilities []float64, source rand.Source) *Generator {
generator := &Generator{}
var (
small workList
large workList
)
n := len(probabilities)
generator.randomGenerator = rand.New(source)
generator.probabilities = make([]float64, n)
generator.alias = make([]int, n)
// Compute scaled probabilities.
p := make([]float64, n)
for i := 0; i < n; i++ {
p[i] = probabilities[i] * float64(n)
}
for i, pi := range p {
if pi < 1 {
small = append(small, i)
} else {
large = append(large, i)
}
}
for len(large) > 0 && len(small) > 0 {
l, g := small.pop(), large.pop()
generator.probabilities[l] = p[l]
generator.alias[l] = g
p[g] = (p[g] + p[l]) - 1
if p[g] < 1 {
small.push(g)
} else {
large.push(g)
}
}
for len(large) > 0 {
g := large.pop()
generator.probabilities[g] = 1
}
for len(small) > 0 {
l := small.pop()
generator.probabilities[l] = 1
}
return generator
}
func (g *Generator) Next() int {
n := len(g.alias)
i := g.randomGenerator.Intn(n)
if g.randomGenerator.Float64() < g.probabilities[i] {
return i
}
return g.alias[i]
}

86
connections/pool.go Normal file
View file

@ -0,0 +1,86 @@
package connections
import (
"context"
"crypto/ecdsa"
"errors"
"math"
"math/rand"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"google.golang.org/grpc"
)
type PoolBuilderOptions struct {
Key *ecdsa.PrivateKey
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
}
type PoolBuilder struct {
addresses []string
weights []float64
}
func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder {
pb.addresses = append(pb.addresses, address)
pb.weights = append(pb.weights, weight)
return pb
}
func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
totalWeight := 0.0
for _, w := range pb.weights {
totalWeight += w
}
if math.Abs(totalWeight-1.0) >= 1e-4 {
return nil, errors.New("total weight must be equal to unity")
}
var cons = make([]*grpc.ClientConn, len(pb.addresses))
for i, address := range pb.addresses {
con, err := func() (*grpc.ClientConn, error) {
toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout)
defer c()
return grpc.DialContext(toctx, address, grpc.WithInsecure(), grpc.WithBlock())
}()
if err != nil {
return nil, err
}
cons[i] = con
}
return new(pb.weights, options.Key, cons)
}
type Pool interface {
Client() client.Client
}
type pool struct {
generator *Generator
clients []client.Client
}
func new(weights []float64, key *ecdsa.PrivateKey, connections []*grpc.ClientConn) (Pool, error) {
clients := make([]client.Client, len(weights))
for i, con := range connections {
c, err := client.New(client.WithDefaultPrivateKey(key), client.WithGRPCConnection(con))
if err != nil {
return nil, err
}
clients[i] = c
}
source := rand.NewSource(time.Now().UnixNano())
return &pool{
generator: NewGenerator(weights, source),
clients: clients,
}, nil
}
func (p *pool) Client() client.Client {
if len(p.clients) == 1 {
return p.clients[0]
}
return p.clients[p.generator.Next()]
}

View file

@ -6,24 +6,19 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"io" "io"
"math" "math"
"sort"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gate/connections"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc"
) )
const ( const maxObjectSize = uint64(1 << 28) // Limit objects to 256 MiB.
nodeConnectionTimeout = 10 * time.Second
maxObjectSize = uint64(1 << 26) // 64MiB
)
type BaseOptions struct { type BaseOptions struct {
Client client.Client Client client.Client
@ -74,20 +69,17 @@ type ClientPlant interface {
type neofsObjectClient struct { type neofsObjectClient struct {
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
conn *grpc.ClientConn pool connections.Pool
} }
type neofsClientPlant struct { type neofsClientPlant struct {
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
ownerID *owner.ID ownerID *owner.ID
conn *grpc.ClientConn pool connections.Pool
} }
func (cc *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) { func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) {
c, err := client.New(client.WithDefaultPrivateKey(cc.key), client.WithGRPCConnection(cc.conn)) c := cp.pool.Client()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create reusable neofs client")
}
st, err := c.CreateSession(ctx, math.MaxUint64) st, err := c.CreateSession(ctx, math.MaxUint64)
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token") return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token")
@ -96,47 +88,18 @@ func (cc *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Cl
} }
func (cc *neofsClientPlant) Object() ObjectClient { func (cc *neofsClientPlant) Object() ObjectClient {
return &neofsObjectClient{key: cc.key, conn: cc.conn} return &neofsObjectClient{
key: cc.key,
pool: cc.pool,
}
} }
func (cc *neofsClientPlant) OwnerID() *owner.ID { func (cc *neofsClientPlant) OwnerID() *owner.ID {
return cc.ownerID return cc.ownerID
} }
type Connection struct { func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) {
address string return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil
weight float64
}
type ConnectionList []Connection
func (p ConnectionList) Len() int { return len(p) }
func (p ConnectionList) Less(i, j int) bool { return p[i].weight < p[j].weight }
func (p ConnectionList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (cl *ConnectionList) Add(address string, weight float64) ConnectionList {
*cl = append(*cl, Connection{address, weight})
return *cl
}
func NewClientPlant(ctx context.Context, connectionList ConnectionList, creds Credentials) (ClientPlant, error) {
toctx, c := context.WithTimeout(ctx, nodeConnectionTimeout)
defer c()
sort.Sort(sort.Reverse(connectionList))
// TODO: Use connection pool here.
address := connectionList[0].address
conn, err := grpc.DialContext(toctx, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
if err == context.DeadlineExceeded {
err = errors.New("failed to connect to neofs node")
}
return nil, err
}
return &neofsClientPlant{
key: creds.PrivateKey(),
ownerID: creds.Owner(),
conn: conn,
}, nil
} }
func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) { func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) {

View file

@ -1,39 +0,0 @@
package pool
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"google.golang.org/grpc"
)
type Client interface {
// receive status of connection pool
Status() error
// worker should be run in goroutine to re-balancing
Worker(context.Context)
Connection(context.Context) (*grpc.ClientConn, error)
Session(context.Context, *grpc.ClientConn) (*token.SessionToken, error)
}
type pool struct{}
func (p *pool) Status() error {
return nil
}
func (p *pool) Worker(ctx context.Context) {
panic("not implemented")
}
func (p *pool) Connection(ctx context.Context) (*grpc.ClientConn, error) {
panic("not implemented")
}
func (p *pool) Session(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) {
panic("not implemented")
}
func New() Client {
return &pool{}
}

View file

@ -115,7 +115,7 @@ func settings() *viper.Viper {
peers := flags.StringArrayP(cfgPeers, "p", nil, "NeoFS nodes") peers := flags.StringArrayP(cfgPeers, "p", nil, "NeoFS nodes")
// set prefers: // set prefers:
v.Set(cfgApplicationName, "neofs-http-gw") v.Set(cfgApplicationName, "neofs-http-gate")
v.Set(cfgApplicationVersion, Version) v.Set(cfgApplicationVersion, Version)
// set defaults: // set defaults:

View file

@ -18,6 +18,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const jsonHeader = "application/json; charset=UTF-8"
var putOptionsPool = sync.Pool{ var putOptionsPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return new(neofs.PutOptions) return new(neofs.PutOptions)
@ -140,16 +142,14 @@ func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *to
} }
type putResponse struct { type putResponse struct {
OID string `json:"object_id"` ObjectID string `json:"object_id"`
CID string `json:"container_id"` ContainerID string `json:"container_id"`
} }
const jsonHeader = "application/json; charset=UTF-8"
func newPutResponse(addr *object.Address) *putResponse { func newPutResponse(addr *object.Address) *putResponse {
return &putResponse{ return &putResponse{
OID: addr.ObjectID().String(), ObjectID: addr.ObjectID().String(),
CID: addr.ContainerID().String(), ContainerID: addr.ContainerID().String(),
} }
} }