From 62a03251ced5eb70c239f9db02cd2db1b74e390c Mon Sep 17 00:00:00 2001 From: Pavel Korotkov Date: Mon, 5 Apr 2021 17:48:01 +0300 Subject: [PATCH] Add connection pool implementation (part 1) Signed-off-by: Pavel Korotkov --- .test.env | 6 ++- app.go | 35 +++++++++------- connections/generator.go | 76 +++++++++++++++++++++++++++++++++++ connections/pool.go | 86 ++++++++++++++++++++++++++++++++++++++++ health.go => health._go_ | 0 neofs/client-plant.go | 61 ++++++---------------------- pool/pool.go | 39 ------------------ settings.go | 2 +- uploader/upload.go | 12 +++--- 9 files changed, 205 insertions(+), 112 deletions(-) create mode 100644 connections/generator.go create mode 100644 connections/pool.go rename health.go => health._go_ (100%) delete mode 100644 pool/pool.go diff --git a/.test.env b/.test.env index 995d4a3..840ef5b 100644 --- a/.test.env +++ b/.test.env @@ -9,5 +9,7 @@ HTTP_GW_KEEPALIVE_TIMEOUT=300s HTTP_GW_KEEPALIVE_TIME=120s HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=True HTTP_GW_CONN_TTL=1h -HTTP_GW_PEERS_0_WEIGHT=1.0 -HTTP_GW_PEERS_0_ADDRESS=s01.neofs.devenv:8080 \ No newline at end of file +HTTP_GW_PEERS_0_WEIGHT=0.6 +HTTP_GW_PEERS_0_ADDRESS=s01.neofs.devenv:8080 +HTTP_GW_PEERS_1_WEIGHT=0.4 +HTTP_GW_PEERS_1_ADDRESS=s02.neofs.devenv:8080 \ No newline at end of file diff --git a/app.go b/app.go index 1b0f981..bbfead4 100644 --- a/app.go +++ b/app.go @@ -5,6 +5,7 @@ import ( "strconv" "github.com/fasthttp/router" + "github.com/nspcc-dev/neofs-http-gate/connections" "github.com/nspcc-dev/neofs-http-gate/downloader" "github.com/nspcc-dev/neofs-http-gate/logger" "github.com/nspcc-dev/neofs-http-gate/neofs" @@ -68,9 +69,6 @@ func newApp(ctx context.Context, opt ...Option) App { if a.cfg.GetBool(cmdVerbose) { grpclog.SetLoggerV2(a.auxiliaryLog) } - // conTimeout := a.cfg.GetDuration(cfgConTimeout) - // reqTimeout := a.cfg.GetDuration(cfgReqTimeout) - // tckTimeout := a.cfg.GetDuration(cfgRebalance) // -- setup FastHTTP server -- a.webServer.Name = "neofs-http-gate" a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) @@ -82,29 +80,38 @@ func newApp(ctx context.Context, opt ...Option) App { a.webServer.NoDefaultContentType = true a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize) // -- -- -- -- -- -- FIXME -- -- -- -- -- -- - // Does not work with StreamRequestBody, - // some bugs with readMultipartForm - // https://github.com/valyala/fasthttp/issues/968 + // Does not work with StreamRequestBody due to bugs with + // readMultipartForm, see https://github.com/valyala/fasthttp/issues/968 a.webServer.DisablePreParseMultipartForm = true a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) // -- -- -- -- -- -- -- -- -- -- -- -- -- -- - var cl neofs.ConnectionList + creds, err := neofs.NewCredentials(a.cfg.GetString(cmdNeoFSKey)) + if err != nil { + a.log.Fatal("failed to get neofs credentials", zap.Error(err)) + } + pb := new(connections.PoolBuilder) for i := 0; ; i++ { address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address") weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight") if address == "" { break } - cl.Add(address, weight) + pb.AddNode(address, weight) a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight)) } - creds, err := neofs.NewCredentials(a.cfg.GetString(cmdNeoFSKey)) - if err != nil { - a.log.Fatal("could not get neofs credentials", zap.Error(err)) + opts := &connections.PoolBuilderOptions{ + Key: creds.PrivateKey(), + NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout), + NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout), + ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance), } - a.plant, err = neofs.NewClientPlant(ctx, cl, creds) + pool, err := pb.Build(ctx, opts) if err != nil { - a.log.Fatal("failed to create neofs client") + a.log.Fatal("failed to create connection pool", zap.Error(err)) + } + a.plant, err = neofs.NewClientPlant(ctx, pool, creds) + if err != nil { + a.log.Fatal("failed to create neofs client plant") } return a } @@ -144,8 +151,6 @@ func (a *app) Serve(ctx context.Context) { a.log.Info("added path /get/{cid}/{oid}") r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", downloader.DownloadByAttribute) a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") - // attaching /-/(ready,healthy) - // attachHealthy(r, a.pool.Status) // enable metrics if a.cfg.GetBool(cmdMetrics) { a.log.Info("added path /metrics/") diff --git a/connections/generator.go b/connections/generator.go new file mode 100644 index 0000000..a03bd26 --- /dev/null +++ b/connections/generator.go @@ -0,0 +1,76 @@ +package connections + +import "math/rand" + +// https://www.keithschwarz.com/darts-dice-coins/ +type Generator struct { + randomGenerator *rand.Rand + probabilities []float64 + alias []int +} + +type workList []int + +func (wl *workList) push(e int) { + *wl = append(*wl, e) +} + +func (wl *workList) pop() int { + l := len(*wl) - 1 + n := (*wl)[l] + *wl = (*wl)[:l] + return n +} + +func NewGenerator(probabilities []float64, source rand.Source) *Generator { + generator := &Generator{} + var ( + small workList + large workList + ) + n := len(probabilities) + generator.randomGenerator = rand.New(source) + generator.probabilities = make([]float64, n) + generator.alias = make([]int, n) + // Compute scaled probabilities. + p := make([]float64, n) + for i := 0; i < n; i++ { + p[i] = probabilities[i] * float64(n) + } + for i, pi := range p { + if pi < 1 { + small = append(small, i) + } else { + large = append(large, i) + } + } + for len(large) > 0 && len(small) > 0 { + l, g := small.pop(), large.pop() + generator.probabilities[l] = p[l] + generator.alias[l] = g + p[g] = (p[g] + p[l]) - 1 + if p[g] < 1 { + small.push(g) + } else { + large.push(g) + } + } + for len(large) > 0 { + g := large.pop() + generator.probabilities[g] = 1 + } + for len(small) > 0 { + l := small.pop() + generator.probabilities[l] = 1 + } + return generator +} + +func (g *Generator) Next() int { + n := len(g.alias) + i := g.randomGenerator.Intn(n) + if g.randomGenerator.Float64() < g.probabilities[i] { + return i + } + return g.alias[i] +} diff --git a/connections/pool.go b/connections/pool.go new file mode 100644 index 0000000..5fefbf0 --- /dev/null +++ b/connections/pool.go @@ -0,0 +1,86 @@ +package connections + +import ( + "context" + "crypto/ecdsa" + "errors" + "math" + "math/rand" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "google.golang.org/grpc" +) + +type PoolBuilderOptions struct { + Key *ecdsa.PrivateKey + NodeConnectionTimeout time.Duration + NodeRequestTimeout time.Duration + ClientRebalanceInterval time.Duration +} + +type PoolBuilder struct { + addresses []string + weights []float64 +} + +func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder { + pb.addresses = append(pb.addresses, address) + pb.weights = append(pb.weights, weight) + return pb +} + +func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { + totalWeight := 0.0 + for _, w := range pb.weights { + totalWeight += w + } + if math.Abs(totalWeight-1.0) >= 1e-4 { + return nil, errors.New("total weight must be equal to unity") + } + var cons = make([]*grpc.ClientConn, len(pb.addresses)) + for i, address := range pb.addresses { + con, err := func() (*grpc.ClientConn, error) { + toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout) + defer c() + return grpc.DialContext(toctx, address, grpc.WithInsecure(), grpc.WithBlock()) + }() + if err != nil { + return nil, err + } + cons[i] = con + } + return new(pb.weights, options.Key, cons) +} + +type Pool interface { + Client() client.Client +} + +type pool struct { + generator *Generator + clients []client.Client +} + +func new(weights []float64, key *ecdsa.PrivateKey, connections []*grpc.ClientConn) (Pool, error) { + clients := make([]client.Client, len(weights)) + for i, con := range connections { + c, err := client.New(client.WithDefaultPrivateKey(key), client.WithGRPCConnection(con)) + if err != nil { + return nil, err + } + clients[i] = c + } + source := rand.NewSource(time.Now().UnixNano()) + return &pool{ + generator: NewGenerator(weights, source), + clients: clients, + }, nil +} + +func (p *pool) Client() client.Client { + if len(p.clients) == 1 { + return p.clients[0] + } + return p.clients[p.generator.Next()] +} diff --git a/health.go b/health._go_ similarity index 100% rename from health.go rename to health._go_ diff --git a/neofs/client-plant.go b/neofs/client-plant.go index 39c7629..0078008 100644 --- a/neofs/client-plant.go +++ b/neofs/client-plant.go @@ -6,24 +6,19 @@ import ( "crypto/ecdsa" "io" "math" - "sort" - "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-http-gate/connections" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" "github.com/pkg/errors" - "google.golang.org/grpc" ) -const ( - nodeConnectionTimeout = 10 * time.Second - maxObjectSize = uint64(1 << 26) // 64MiB -) +const maxObjectSize = uint64(1 << 28) // Limit objects to 256 MiB. type BaseOptions struct { Client client.Client @@ -74,20 +69,17 @@ type ClientPlant interface { type neofsObjectClient struct { key *ecdsa.PrivateKey - conn *grpc.ClientConn + pool connections.Pool } type neofsClientPlant struct { key *ecdsa.PrivateKey ownerID *owner.ID - conn *grpc.ClientConn + pool connections.Pool } -func (cc *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) { - c, err := client.New(client.WithDefaultPrivateKey(cc.key), client.WithGRPCConnection(cc.conn)) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to create reusable neofs client") - } +func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) { + c := cp.pool.Client() st, err := c.CreateSession(ctx, math.MaxUint64) if err != nil { return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token") @@ -96,47 +88,18 @@ func (cc *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Cl } func (cc *neofsClientPlant) Object() ObjectClient { - return &neofsObjectClient{key: cc.key, conn: cc.conn} + return &neofsObjectClient{ + key: cc.key, + pool: cc.pool, + } } func (cc *neofsClientPlant) OwnerID() *owner.ID { return cc.ownerID } -type Connection struct { - address string - weight float64 -} - -type ConnectionList []Connection - -func (p ConnectionList) Len() int { return len(p) } -func (p ConnectionList) Less(i, j int) bool { return p[i].weight < p[j].weight } -func (p ConnectionList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func (cl *ConnectionList) Add(address string, weight float64) ConnectionList { - *cl = append(*cl, Connection{address, weight}) - return *cl -} - -func NewClientPlant(ctx context.Context, connectionList ConnectionList, creds Credentials) (ClientPlant, error) { - toctx, c := context.WithTimeout(ctx, nodeConnectionTimeout) - defer c() - sort.Sort(sort.Reverse(connectionList)) - // TODO: Use connection pool here. - address := connectionList[0].address - conn, err := grpc.DialContext(toctx, address, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - if err == context.DeadlineExceeded { - err = errors.New("failed to connect to neofs node") - } - return nil, err - } - return &neofsClientPlant{ - key: creds.PrivateKey(), - ownerID: creds.Owner(), - conn: conn, - }, nil +func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) { + return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil } func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) { diff --git a/pool/pool.go b/pool/pool.go deleted file mode 100644 index 94874ec..0000000 --- a/pool/pool.go +++ /dev/null @@ -1,39 +0,0 @@ -package pool - -import ( - "context" - - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "google.golang.org/grpc" -) - -type Client interface { - // receive status of connection pool - Status() error - // worker should be run in goroutine to re-balancing - Worker(context.Context) - Connection(context.Context) (*grpc.ClientConn, error) - Session(context.Context, *grpc.ClientConn) (*token.SessionToken, error) -} - -type pool struct{} - -func (p *pool) Status() error { - return nil -} - -func (p *pool) Worker(ctx context.Context) { - panic("not implemented") -} - -func (p *pool) Connection(ctx context.Context) (*grpc.ClientConn, error) { - panic("not implemented") -} - -func (p *pool) Session(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) { - panic("not implemented") -} - -func New() Client { - return &pool{} -} diff --git a/settings.go b/settings.go index 0e5f79f..7844293 100644 --- a/settings.go +++ b/settings.go @@ -115,7 +115,7 @@ func settings() *viper.Viper { peers := flags.StringArrayP(cfgPeers, "p", nil, "NeoFS nodes") // set prefers: - v.Set(cfgApplicationName, "neofs-http-gw") + v.Set(cfgApplicationName, "neofs-http-gate") v.Set(cfgApplicationVersion, Version) // set defaults: diff --git a/uploader/upload.go b/uploader/upload.go index a77272d..158939c 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -18,6 +18,8 @@ import ( "go.uber.org/zap" ) +const jsonHeader = "application/json; charset=UTF-8" + var putOptionsPool = sync.Pool{ New: func() interface{} { return new(neofs.PutOptions) @@ -140,16 +142,14 @@ func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *to } type putResponse struct { - OID string `json:"object_id"` - CID string `json:"container_id"` + ObjectID string `json:"object_id"` + ContainerID string `json:"container_id"` } -const jsonHeader = "application/json; charset=UTF-8" - func newPutResponse(addr *object.Address) *putResponse { return &putResponse{ - OID: addr.ObjectID().String(), - CID: addr.ContainerID().String(), + ObjectID: addr.ObjectID().String(), + ContainerID: addr.ContainerID().String(), } }