forked from TrueCloudLab/frostfs-http-gw
NSPCC-762 Connection pool
- implement connection pool - wait until container creates - refactoring gw service - add config option to enable prometheus and pprof - update neofs-proto to v0.2.8
This commit is contained in:
parent
39279e4c5f
commit
746946290c
9 changed files with 337 additions and 100 deletions
|
@ -12,7 +12,7 @@ WORKDIR /src
|
|||
|
||||
COPY . /src
|
||||
|
||||
RUN go build -v -mod=vendor -ldflags "${LDFLAGS} -X main.Build=$(date -u +%s%N)" -o /go/bin/neofs-gw ./
|
||||
RUN go build -v -mod=vendor -trimpath -ldflags "${LDFLAGS} -X main.Build=$(date -u +%s%N)" -o /go/bin/neofs-gw ./
|
||||
|
||||
# Executable image
|
||||
FROM scratch
|
||||
|
|
10
README.md
10
README.md
|
@ -11,7 +11,8 @@ You can download files from NeoFS Network using NeoFS Gateway.
|
|||
|
||||
```
|
||||
# Flags
|
||||
|
||||
--pprof enable pprof
|
||||
--metrics enable prometheus
|
||||
-h, --help show help
|
||||
-v, --version show version
|
||||
--key string "generated" to generate key, path to private key file, hex string or wif (default "generated")
|
||||
|
@ -19,7 +20,7 @@ You can download files from NeoFS Network using NeoFS Gateway.
|
|||
--request_timeout duration gRPC request timeout (default 5s)
|
||||
--connect_timeout duration gRPC connect timeout (default 30s)
|
||||
--listen_address string HTTP Gateway listen address (default "0.0.0.0:8082")
|
||||
--neofs_address string NeoFS Node address for proxying requests (default "0.0.0.0:8080")
|
||||
-p, --peers stringArray NeoFS nodes
|
||||
|
||||
# Environments:
|
||||
|
||||
|
@ -27,7 +28,10 @@ GW_KEY=stirng - "generated" to generate key, path to p
|
|||
GW_REQUEST_TIMEOUT=Duration - timeout for request
|
||||
GW_CONNECT_TIMEOUT=Duration - timeout for connection
|
||||
GW_LISTEN_ADDRESS=host:port - address to listen connections
|
||||
GW_NEOFS_ADDRESS=host:port - address of NeoFS Node
|
||||
GW_PEERS_<X>_ADDRESS=host:port - address of NeoFS Node
|
||||
GW_PEERS_<X>_WEIGHT=float - weight of NeoFS Node
|
||||
GW_PPROF=bool - enable/disable pprof (/debug/pprof)
|
||||
GW_METRICS=bool - enable/disable prometheus metrics endpoint (/metrics)
|
||||
GW_KEEPALIVE_TIME=Duration - аfter a duration of this time if the client doesn't see any activity
|
||||
it pings the server to see if the transport is still alive.
|
||||
GW_KEEPALIVE_TIMEOUT=Duration - after having pinged for keepalive check, the client waits for a duration
|
||||
|
|
3
go.mod
3
go.mod
|
@ -5,8 +5,9 @@ go 1.13
|
|||
require (
|
||||
github.com/labstack/echo/v4 v4.1.11
|
||||
github.com/nspcc-dev/neofs-crypto v0.2.2
|
||||
github.com/nspcc-dev/neofs-proto v0.2.5
|
||||
github.com/nspcc-dev/neofs-proto v0.2.8
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/prometheus/client_golang v1.2.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.6.1
|
||||
go.uber.org/zap v1.13.0
|
||||
|
|
10
go.sum
10
go.sum
|
@ -13,9 +13,12 @@ github.com/awalterschulze/gographviz v0.0.0-20181013152038-b2885df04310 h1:t+qxR
|
|||
github.com/awalterschulze/gographviz v0.0.0-20181013152038-b2885df04310/go.mod h1:GEV5wmg4YquNw7v1kkyoX9etIk8yVmXj+AkDHuuETHs=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
|
||||
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
|
@ -99,6 +102,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
|
|||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
|
||||
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
|
@ -116,6 +120,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.2 h1:jLc5O+Wdpaq7L4lNYFX7li+OP4I1FsvvcPW1
|
|||
github.com/nspcc-dev/neofs-crypto v0.2.2/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
|
||||
github.com/nspcc-dev/neofs-proto v0.2.5 h1:/zPystudzakARWvk618th4+4bYuipkFGSVdGAvAwRVo=
|
||||
github.com/nspcc-dev/neofs-proto v0.2.5/go.mod h1:UDr4USXDHqn97StDvL7U/DcWRmRloIefaFo8qk4GbOo=
|
||||
github.com/nspcc-dev/neofs-proto v0.2.8 h1:VWEIuqR2lQEuarkhLXhK62HLebSDmE6e4JqC7l2hc60=
|
||||
github.com/nspcc-dev/neofs-proto v0.2.8/go.mod h1:uNc8tEAIcY828/kH94DpwiiSODvPHiesIM20hHckjAM=
|
||||
github.com/nspcc-dev/netmap v1.6.1 h1:Pigqpqi6QSdRiusbq5XlO20A18k6Eyu7j9MzOfAE3CM=
|
||||
github.com/nspcc-dev/netmap v1.6.1/go.mod h1:mhV3UOg9ljQmu0teQShD6+JYX09XY5gu2I4hIByCH9M=
|
||||
github.com/nspcc-dev/rfc6979 v0.1.0 h1:Lwg7esRRoyK1Up/IN1vAef1EmvrBeMHeeEkek2fAJ6c=
|
||||
|
@ -133,17 +139,21 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
|||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.2.1 h1:JnMpQc6ppsNgw9QPAGF6Dod479itz7lvlsMzzNayLOI=
|
||||
github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
|
||||
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY=
|
||||
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
|
||||
github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
|
|
|
@ -47,7 +47,7 @@ func safeLevel(lvl string) zap.AtomicLevel {
|
|||
}
|
||||
}
|
||||
|
||||
func newLogger(v *viper.Viper) (*zap.Logger, error) {
|
||||
func newLogger(v *viper.Viper) *zap.Logger {
|
||||
c := zap.NewProductionConfig()
|
||||
|
||||
c.OutputPaths = []string{"stdout"}
|
||||
|
@ -87,11 +87,11 @@ func newLogger(v *viper.Viper) (*zap.Logger, error) {
|
|||
// enable trace only for current log-level
|
||||
zap.AddStacktrace(traceLvl))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if v.GetBool("logger.no_disclaimer") {
|
||||
return l, nil
|
||||
return l
|
||||
}
|
||||
|
||||
name := v.GetString("app.name")
|
||||
|
@ -99,7 +99,7 @@ func newLogger(v *viper.Viper) (*zap.Logger, error) {
|
|||
|
||||
return l.With(
|
||||
zap.String("app_name", name),
|
||||
zap.String("app_version", version)), nil
|
||||
zap.String("app_version", version))
|
||||
}
|
||||
|
||||
func (z *zapLogger) Info(args ...interface{}) {
|
||||
|
|
99
main.go
99
main.go
|
@ -3,114 +3,93 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/nspcc-dev/neofs-proto/object"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
type router struct {
|
||||
pool *Pool
|
||||
log *zap.Logger
|
||||
timeout time.Duration
|
||||
key *ecdsa.PrivateKey
|
||||
cli object.ServiceClient
|
||||
}
|
||||
|
||||
func main() {
|
||||
v := settings()
|
||||
|
||||
log, err := newLogger(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Info("running application", zap.String("version", v.GetString("app.version")))
|
||||
|
||||
var (
|
||||
cfg = new(config)
|
||||
grace = newGracefulContext(log)
|
||||
v = settings()
|
||||
l = newLogger(v)
|
||||
g = newGracefulContext(l)
|
||||
)
|
||||
|
||||
if v.GetBool("verbose") {
|
||||
grpclog.SetLoggerV2(
|
||||
gRPCLogger(log))
|
||||
grpclog.SetLoggerV2(gRPCLogger(l))
|
||||
}
|
||||
|
||||
cfg.log = log
|
||||
cfg.key = fetchKey(log, v)
|
||||
cfg.timeout = v.GetDuration("request_timeout")
|
||||
|
||||
ctx, cancel := context.WithTimeout(grace, v.GetDuration("connect_timeout"))
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, v.GetString("neofs_address"),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: v.GetDuration("keepalive.time"),
|
||||
Timeout: v.GetDuration("keepalive.timeout"),
|
||||
PermitWithoutStream: v.GetBool("keepalive.permit_without_stream"),
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Panic("could not connect to neofs-node",
|
||||
zap.String("neofs-node", v.GetString("neofs_node_addr")),
|
||||
zap.Error(err))
|
||||
r := &router{
|
||||
log: l,
|
||||
key: fetchKey(l, v),
|
||||
pool: newPool(g, l, v),
|
||||
timeout: v.GetDuration("request_timeout"),
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithCancel(grace)
|
||||
defer cancel()
|
||||
|
||||
go checkConnection(ctx, conn, log)
|
||||
cfg.cli = object.NewServiceClient(conn)
|
||||
go checkConnection(g, r.pool)
|
||||
|
||||
e := echo.New()
|
||||
e.Debug = false
|
||||
e.HidePort = true
|
||||
e.HideBanner = true
|
||||
|
||||
e.GET("/:cid/:oid", cfg.receiveFile)
|
||||
e.GET("/:cid/:oid", r.receiveFile)
|
||||
|
||||
// enable metrics
|
||||
if v.GetBool("metrics") {
|
||||
l.Info("enabled /metrics")
|
||||
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
|
||||
}
|
||||
|
||||
// enable pprof
|
||||
if v.GetBool("pprof") {
|
||||
l.Info("enabled /debug/pprof")
|
||||
e.Any("/debug/pprof*", echo.WrapHandler(http.DefaultServeMux))
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Info("run gateway server",
|
||||
l.Info("run gateway server",
|
||||
zap.String("address", v.GetString("listen_address")))
|
||||
|
||||
if err := e.Start(v.GetString("listen_address")); err != nil {
|
||||
log.Panic("could not start server", zap.Error(err))
|
||||
l.Panic("could not start server", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
<-g.Done()
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.TODO(), time.Second*30)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
|
||||
defer cancel()
|
||||
|
||||
log.Info("stopping server", zap.Error(e.Shutdown(ctx)))
|
||||
l.Info("stopping server", zap.Error(e.Shutdown(ctx)))
|
||||
}
|
||||
|
||||
func checkConnection(ctx context.Context, conn *grpc.ClientConn, log *zap.Logger) {
|
||||
tick := time.NewTicker(time.Second)
|
||||
func checkConnection(ctx context.Context, p *Pool) {
|
||||
tick := time.NewTicker(time.Second * 15)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break loop
|
||||
case <-tick.C:
|
||||
switch state := conn.GetState(); state {
|
||||
case connectivity.Idle, connectivity.Connecting, connectivity.Ready:
|
||||
// It's ok..
|
||||
default:
|
||||
log.Error("could not establish connection",
|
||||
zap.Stringer("state", state),
|
||||
zap.Any("connection", conn.Target()))
|
||||
}
|
||||
p.reBalance(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
tick.Stop()
|
||||
|
||||
p.log.Info("stop connection worker")
|
||||
}
|
||||
|
|
244
pool.go
Normal file
244
pool.go
Normal file
|
@ -0,0 +1,244 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
type (
|
||||
node struct {
|
||||
address string
|
||||
weight uint32
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
Pool struct {
|
||||
log *zap.Logger
|
||||
|
||||
ctl time.Duration
|
||||
opts keepalive.ClientParameters
|
||||
|
||||
cur *grpc.ClientConn
|
||||
|
||||
*sync.RWMutex
|
||||
nodes []*node
|
||||
keys []uint32
|
||||
conns map[uint32][]*grpc.ClientConn
|
||||
}
|
||||
)
|
||||
|
||||
var errNoHealthyConnections = errors.New("no active connections")
|
||||
|
||||
func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
|
||||
p := &Pool{
|
||||
log: l,
|
||||
RWMutex: new(sync.RWMutex),
|
||||
keys: make([]uint32, 0),
|
||||
nodes: make([]*node, 0),
|
||||
conns: make(map[uint32][]*grpc.ClientConn),
|
||||
|
||||
// fill with defaults:
|
||||
ctl: time.Second * 15,
|
||||
opts: keepalive.ClientParameters{
|
||||
Time: time.Second * 10,
|
||||
Timeout: time.Minute * 5,
|
||||
PermitWithoutStream: true,
|
||||
},
|
||||
}
|
||||
buf := make([]byte, 8)
|
||||
if _, err := crand.Read(buf); err != nil {
|
||||
l.Panic("could not read seed", zap.Error(err))
|
||||
}
|
||||
|
||||
seed := binary.BigEndian.Uint64(buf)
|
||||
rand.Seed(int64(seed))
|
||||
l.Info("used random seed", zap.Uint64("seed", seed))
|
||||
|
||||
if val := v.GetDuration("connect_timeout"); val > 0 {
|
||||
p.ctl = val
|
||||
}
|
||||
|
||||
if val := v.GetDuration("keepalive.time"); val > 0 {
|
||||
p.opts.Time = val
|
||||
}
|
||||
|
||||
if val := v.GetDuration("keepalive.timeout"); val > 0 {
|
||||
p.opts.Timeout = val
|
||||
}
|
||||
|
||||
if v.IsSet("keepalive.permit_without_stream") {
|
||||
p.opts.PermitWithoutStream = v.GetBool("keepalive.permit_without_stream")
|
||||
}
|
||||
|
||||
for i := 0; ; i++ {
|
||||
key := "peers." + strconv.Itoa(i) + "."
|
||||
address := v.GetString(key + "address")
|
||||
weight := v.GetFloat64(key + "weight")
|
||||
|
||||
if address == "" {
|
||||
l.Warn("skip, empty address")
|
||||
break
|
||||
}
|
||||
|
||||
p.nodes = append(p.nodes, &node{
|
||||
address: address,
|
||||
weight: uint32(weight * 100),
|
||||
})
|
||||
|
||||
l.Info("add new peer",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Uint32("weight", p.nodes[i].weight))
|
||||
}
|
||||
|
||||
p.reBalance(ctx)
|
||||
|
||||
cur, err := p.getConnection()
|
||||
if err != nil {
|
||||
l.Panic("could get connection", zap.Error(err))
|
||||
}
|
||||
|
||||
p.cur = cur
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Pool) close() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
for i := range p.nodes {
|
||||
if p.nodes[i] == nil || p.nodes[i].conn == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
p.log.Warn("close connection",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Error(p.nodes[i].conn.Close()))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) reBalance(ctx context.Context) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
keys := make(map[uint32]struct{})
|
||||
|
||||
for i := range p.nodes {
|
||||
var (
|
||||
idx = -1
|
||||
exists bool
|
||||
err error
|
||||
conn = p.nodes[i].conn
|
||||
weight = p.nodes[i].weight
|
||||
)
|
||||
|
||||
if conn == nil {
|
||||
p.log.Warn("empty connection, try to connect",
|
||||
zap.String("address", p.nodes[i].address))
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, p.ctl)
|
||||
conn, err = grpc.DialContext(ctx, p.nodes[i].address,
|
||||
grpc.WithBlock(),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithKeepaliveParams(p.opts))
|
||||
cancel()
|
||||
|
||||
if err != nil || conn == nil {
|
||||
p.log.Warn("skip, could not connect to node",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
p.nodes[i].conn = conn
|
||||
p.log.Info("connected to node", zap.String("address", p.nodes[i].address))
|
||||
}
|
||||
|
||||
switch st := conn.GetState(); st {
|
||||
case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
|
||||
keys[weight] = struct{}{}
|
||||
|
||||
for j := range p.conns[weight] {
|
||||
if p.conns[weight][j] == conn {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !exists {
|
||||
p.conns[weight] = append(p.conns[weight], conn)
|
||||
}
|
||||
|
||||
// Problems with connection, try to remove from :
|
||||
default:
|
||||
for j := range p.conns[weight] {
|
||||
if p.conns[weight][j] == conn {
|
||||
idx = j
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if exists {
|
||||
// remove from connections
|
||||
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p.keys = p.keys[:0]
|
||||
for w := range keys {
|
||||
p.keys = append(p.keys, w)
|
||||
}
|
||||
|
||||
sort.Slice(p.keys, func(i, j int) bool {
|
||||
return p.keys[i] > p.keys[j]
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Pool) getConnection() (*grpc.ClientConn, error) {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
|
||||
if p.cur != nil && isAlive(p.cur) {
|
||||
return p.cur, nil
|
||||
}
|
||||
|
||||
for _, w := range p.keys {
|
||||
switch ln := len(p.conns[w]); ln {
|
||||
case 0:
|
||||
continue
|
||||
case 1:
|
||||
p.cur = p.conns[w][0]
|
||||
return p.cur, nil
|
||||
default: // > 1
|
||||
i := rand.Intn(ln)
|
||||
p.cur = p.conns[w][i]
|
||||
return p.cur, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errNoHealthyConnections
|
||||
}
|
||||
|
||||
func isAlive(cur *grpc.ClientConn) bool {
|
||||
switch st := cur.GetState(); st {
|
||||
case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
46
receive.go
46
receive.go
|
@ -16,33 +16,33 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (cfg *config) receiveFile(c echo.Context) error {
|
||||
func (r *router) receiveFile(c echo.Context) error {
|
||||
var (
|
||||
cid refs.CID
|
||||
oid refs.ObjectID
|
||||
obj *object.Object
|
||||
download = c.QueryParam("download") != ""
|
||||
con, err = r.pool.getConnection()
|
||||
)
|
||||
|
||||
cfg.log.Debug("try to fetch object from network",
|
||||
if err != nil {
|
||||
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
log := r.log.With(
|
||||
zap.String("node", con.Target()),
|
||||
zap.String("cid", c.Param("cid")),
|
||||
zap.String("oid", c.Param("oid")))
|
||||
|
||||
if err := cid.Parse(c.Param("cid")); err != nil {
|
||||
cfg.log.Error("wrong container id",
|
||||
zap.String("cid", c.Param("cid")),
|
||||
zap.String("oid", c.Param("oid")),
|
||||
zap.Error(err))
|
||||
log.Error("wrong container id", zap.Error(err))
|
||||
|
||||
return echo.NewHTTPError(
|
||||
http.StatusBadRequest,
|
||||
errors.Wrap(err, "wrong container id").Error(),
|
||||
)
|
||||
} else if err := oid.Parse(c.Param("oid")); err != nil {
|
||||
cfg.log.Error("wrong object id",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("oid", c.Param("oid")),
|
||||
zap.Error(err))
|
||||
log.Error("wrong object id", zap.Error(err))
|
||||
|
||||
return echo.NewHTTPError(
|
||||
http.StatusBadRequest,
|
||||
|
@ -50,39 +50,29 @@ func (cfg *config) receiveFile(c echo.Context) error {
|
|||
)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
|
||||
defer cancel()
|
||||
|
||||
req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}}
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
|
||||
if err := service.SignRequestHeader(cfg.key, req); err != nil {
|
||||
cfg.log.Error("could not sign request",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("oid", oid),
|
||||
zap.Error(err))
|
||||
|
||||
if err := service.SignRequestHeader(r.key, req); err != nil {
|
||||
log.Error("could not sign request", zap.Error(err))
|
||||
return echo.NewHTTPError(
|
||||
http.StatusBadRequest,
|
||||
errors.Wrap(err, "could not sign request").Error())
|
||||
}
|
||||
|
||||
cli, err := cfg.cli.Get(ctx, req)
|
||||
cli, err := object.NewServiceClient(con).Get(ctx, req)
|
||||
if err != nil {
|
||||
cfg.log.Error("could not prepare connection",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("oid", oid),
|
||||
zap.Error(err))
|
||||
log.Error("could not prepare connection", zap.Error(err))
|
||||
|
||||
return echo.NewHTTPError(
|
||||
http.StatusBadRequest,
|
||||
errors.Wrap(err, "could not prepare connection").Error(),
|
||||
)
|
||||
} else if obj, err = receiveObject(cli); err != nil {
|
||||
cfg.log.Error("could not receive object",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("oid", oid),
|
||||
zap.Error(err))
|
||||
log.Error("could not receive object", zap.Error(err))
|
||||
|
||||
switch {
|
||||
case strings.Contains(err.Error(), object.ErrNotFound.Error()),
|
||||
|
@ -96,9 +86,7 @@ func (cfg *config) receiveFile(c echo.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
cfg.log.Info("object fetched successfully",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("oid", oid))
|
||||
log.Info("object fetched successfully")
|
||||
|
||||
c.Response().Header().Set("Content-Length", strconv.FormatUint(obj.SystemHeader.PayloadLength, 10))
|
||||
c.Response().Header().Set("x-object-id", obj.SystemHeader.ID.String())
|
||||
|
|
15
settings.go
15
settings.go
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -61,9 +62,12 @@ func settings() *viper.Viper {
|
|||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
|
||||
// flags setup:
|
||||
flags := pflag.NewFlagSet("comandline", pflag.ExitOnError)
|
||||
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
||||
flags.SortFlags = false
|
||||
|
||||
flags.Bool("pprof", false, "enable pprof")
|
||||
flags.Bool("metrics", false, "enable prometheus")
|
||||
|
||||
help := flags.BoolP("help", "h", false, "show help")
|
||||
version := flags.BoolP("version", "v", false, "show version")
|
||||
|
||||
|
@ -74,7 +78,7 @@ func settings() *viper.Viper {
|
|||
flags.Duration("connect_timeout", time.Second*30, "gRPC connect timeout")
|
||||
|
||||
flags.String("listen_address", "0.0.0.0:8082", "HTTP Gateway listen address")
|
||||
flags.String("neofs_address", "0.0.0.0:8080", "NeoFS Node address for proxying requests")
|
||||
peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes")
|
||||
|
||||
// set prefers:
|
||||
v.Set("app.name", "neofs-gw")
|
||||
|
@ -117,5 +121,12 @@ func settings() *viper.Viper {
|
|||
os.Exit(0)
|
||||
}
|
||||
|
||||
if peers != nil && len(*peers) > 0 {
|
||||
for i := range *peers {
|
||||
v.SetDefault("peers."+strconv.Itoa(i)+".address", (*peers)[i])
|
||||
v.SetDefault("peers."+strconv.Itoa(i)+".weight", 1)
|
||||
}
|
||||
}
|
||||
|
||||
return v
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue