From 746946290c428ba1159ca99c43c325ea36839102 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Sat, 21 Dec 2019 13:26:14 +0300 Subject: [PATCH] NSPCC-762 Connection pool - implement connection pool - wait until container creates - refactoring gw service - add config option to enable prometheus and pprof - update neofs-proto to v0.2.8 --- Dockerfile | 2 +- README.md | 10 ++- go.mod | 3 +- go.sum | 10 +++ logger.go | 8 +- main.go | 99 +++++++++------------ pool.go | 244 ++++++++++++++++++++++++++++++++++++++++++++++++++++ receive.go | 46 ++++------ settings.go | 15 +++- 9 files changed, 337 insertions(+), 100 deletions(-) create mode 100644 pool.go diff --git a/Dockerfile b/Dockerfile index fe791e2..99cad69 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,7 @@ WORKDIR /src COPY . /src -RUN go build -v -mod=vendor -ldflags "${LDFLAGS} -X main.Build=$(date -u +%s%N)" -o /go/bin/neofs-gw ./ +RUN go build -v -mod=vendor -trimpath -ldflags "${LDFLAGS} -X main.Build=$(date -u +%s%N)" -o /go/bin/neofs-gw ./ # Executable image FROM scratch diff --git a/README.md b/README.md index f080fd0..c79c586 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ You can download files from NeoFS Network using NeoFS Gateway. ``` # Flags - + --pprof enable pprof + --metrics enable prometheus -h, --help show help -v, --version show version --key string "generated" to generate key, path to private key file, hex string or wif (default "generated") @@ -19,7 +20,7 @@ You can download files from NeoFS Network using NeoFS Gateway. --request_timeout duration gRPC request timeout (default 5s) --connect_timeout duration gRPC connect timeout (default 30s) --listen_address string HTTP Gateway listen address (default "0.0.0.0:8082") - --neofs_address string NeoFS Node address for proxying requests (default "0.0.0.0:8080") + -p, --peers stringArray NeoFS nodes # Environments: @@ -27,7 +28,10 @@ GW_KEY=stirng - "generated" to generate key, path to p GW_REQUEST_TIMEOUT=Duration - timeout for request GW_CONNECT_TIMEOUT=Duration - timeout for connection GW_LISTEN_ADDRESS=host:port - address to listen connections -GW_NEOFS_ADDRESS=host:port - address of NeoFS Node +GW_PEERS__ADDRESS=host:port - address of NeoFS Node +GW_PEERS__WEIGHT=float - weight of NeoFS Node +GW_PPROF=bool - enable/disable pprof (/debug/pprof) +GW_METRICS=bool - enable/disable prometheus metrics endpoint (/metrics) GW_KEEPALIVE_TIME=Duration - аfter a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. GW_KEEPALIVE_TIMEOUT=Duration - after having pinged for keepalive check, the client waits for a duration diff --git a/go.mod b/go.mod index 83a0e81..112f041 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,9 @@ go 1.13 require ( github.com/labstack/echo/v4 v4.1.11 github.com/nspcc-dev/neofs-crypto v0.2.2 - github.com/nspcc-dev/neofs-proto v0.2.5 + github.com/nspcc-dev/neofs-proto v0.2.8 github.com/pkg/errors v0.8.1 + github.com/prometheus/client_golang v1.2.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.1 go.uber.org/zap v1.13.0 diff --git a/go.sum b/go.sum index e52bed7..c38f10d 100644 --- a/go.sum +++ b/go.sum @@ -13,9 +13,12 @@ github.com/awalterschulze/gographviz v0.0.0-20181013152038-b2885df04310 h1:t+qxR github.com/awalterschulze/gographviz v0.0.0-20181013152038-b2885df04310/go.mod h1:GEV5wmg4YquNw7v1kkyoX9etIk8yVmXj+AkDHuuETHs= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -99,6 +102,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -116,6 +120,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.2 h1:jLc5O+Wdpaq7L4lNYFX7li+OP4I1FsvvcPW1 github.com/nspcc-dev/neofs-crypto v0.2.2/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= github.com/nspcc-dev/neofs-proto v0.2.5 h1:/zPystudzakARWvk618th4+4bYuipkFGSVdGAvAwRVo= github.com/nspcc-dev/neofs-proto v0.2.5/go.mod h1:UDr4USXDHqn97StDvL7U/DcWRmRloIefaFo8qk4GbOo= +github.com/nspcc-dev/neofs-proto v0.2.8 h1:VWEIuqR2lQEuarkhLXhK62HLebSDmE6e4JqC7l2hc60= +github.com/nspcc-dev/neofs-proto v0.2.8/go.mod h1:uNc8tEAIcY828/kH94DpwiiSODvPHiesIM20hHckjAM= github.com/nspcc-dev/netmap v1.6.1 h1:Pigqpqi6QSdRiusbq5XlO20A18k6Eyu7j9MzOfAE3CM= github.com/nspcc-dev/netmap v1.6.1/go.mod h1:mhV3UOg9ljQmu0teQShD6+JYX09XY5gu2I4hIByCH9M= github.com/nspcc-dev/rfc6979 v0.1.0 h1:Lwg7esRRoyK1Up/IN1vAef1EmvrBeMHeeEkek2fAJ6c= @@ -133,17 +139,21 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.2.1 h1:JnMpQc6ppsNgw9QPAGF6Dod479itz7lvlsMzzNayLOI= github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/logger.go b/logger.go index 8807ab7..13aefe0 100644 --- a/logger.go +++ b/logger.go @@ -47,7 +47,7 @@ func safeLevel(lvl string) zap.AtomicLevel { } } -func newLogger(v *viper.Viper) (*zap.Logger, error) { +func newLogger(v *viper.Viper) *zap.Logger { c := zap.NewProductionConfig() c.OutputPaths = []string{"stdout"} @@ -87,11 +87,11 @@ func newLogger(v *viper.Viper) (*zap.Logger, error) { // enable trace only for current log-level zap.AddStacktrace(traceLvl)) if err != nil { - return nil, err + panic(err) } if v.GetBool("logger.no_disclaimer") { - return l, nil + return l } name := v.GetString("app.name") @@ -99,7 +99,7 @@ func newLogger(v *viper.Viper) (*zap.Logger, error) { return l.With( zap.String("app_name", name), - zap.String("app_version", version)), nil + zap.String("app_version", version)) } func (z *zapLogger) Info(args ...interface{}) { diff --git a/main.go b/main.go index 7a54993..4f59a47 100644 --- a/main.go +++ b/main.go @@ -3,114 +3,93 @@ package main import ( "context" "crypto/ecdsa" + "net/http" + _ "net/http/pprof" "time" "github.com/labstack/echo/v4" - "github.com/nspcc-dev/neofs-proto/object" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/keepalive" ) -type config struct { +type router struct { + pool *Pool log *zap.Logger timeout time.Duration key *ecdsa.PrivateKey - cli object.ServiceClient } func main() { - v := settings() - - log, err := newLogger(v) - if err != nil { - panic(err) - } - - log.Info("running application", zap.String("version", v.GetString("app.version"))) - var ( - cfg = new(config) - grace = newGracefulContext(log) + v = settings() + l = newLogger(v) + g = newGracefulContext(l) ) if v.GetBool("verbose") { - grpclog.SetLoggerV2( - gRPCLogger(log)) + grpclog.SetLoggerV2(gRPCLogger(l)) } - cfg.log = log - cfg.key = fetchKey(log, v) - cfg.timeout = v.GetDuration("request_timeout") - - ctx, cancel := context.WithTimeout(grace, v.GetDuration("connect_timeout")) - defer cancel() - - conn, err := grpc.DialContext(ctx, v.GetString("neofs_address"), - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: v.GetDuration("keepalive.time"), - Timeout: v.GetDuration("keepalive.timeout"), - PermitWithoutStream: v.GetBool("keepalive.permit_without_stream"), - }), - ) - if err != nil { - log.Panic("could not connect to neofs-node", - zap.String("neofs-node", v.GetString("neofs_node_addr")), - zap.Error(err)) + r := &router{ + log: l, + key: fetchKey(l, v), + pool: newPool(g, l, v), + timeout: v.GetDuration("request_timeout"), } - ctx, cancel = context.WithCancel(grace) - defer cancel() - - go checkConnection(ctx, conn, log) - cfg.cli = object.NewServiceClient(conn) + go checkConnection(g, r.pool) e := echo.New() e.Debug = false e.HidePort = true e.HideBanner = true - e.GET("/:cid/:oid", cfg.receiveFile) + e.GET("/:cid/:oid", r.receiveFile) + + // enable metrics + if v.GetBool("metrics") { + l.Info("enabled /metrics") + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) + } + + // enable pprof + if v.GetBool("pprof") { + l.Info("enabled /debug/pprof") + e.Any("/debug/pprof*", echo.WrapHandler(http.DefaultServeMux)) + } go func() { - log.Info("run gateway server", + l.Info("run gateway server", zap.String("address", v.GetString("listen_address"))) if err := e.Start(v.GetString("listen_address")); err != nil { - log.Panic("could not start server", zap.Error(err)) + l.Panic("could not start server", zap.Error(err)) } }() - <-ctx.Done() + <-g.Done() - ctx, cancel = context.WithTimeout(context.TODO(), time.Second*30) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) defer cancel() - log.Info("stopping server", zap.Error(e.Shutdown(ctx))) + l.Info("stopping server", zap.Error(e.Shutdown(ctx))) } -func checkConnection(ctx context.Context, conn *grpc.ClientConn, log *zap.Logger) { - tick := time.NewTicker(time.Second) +func checkConnection(ctx context.Context, p *Pool) { + tick := time.NewTicker(time.Second * 15) + loop: for { select { case <-ctx.Done(): break loop case <-tick.C: - switch state := conn.GetState(); state { - case connectivity.Idle, connectivity.Connecting, connectivity.Ready: - // It's ok.. - default: - log.Error("could not establish connection", - zap.Stringer("state", state), - zap.Any("connection", conn.Target())) - } + p.reBalance(ctx) } } tick.Stop() + + p.log.Info("stop connection worker") } diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..fcc57a7 --- /dev/null +++ b/pool.go @@ -0,0 +1,244 @@ +package main + +import ( + "context" + crand "crypto/rand" + "encoding/binary" + "errors" + "math/rand" + "sort" + "strconv" + "sync" + "time" + + "github.com/spf13/viper" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/keepalive" +) + +type ( + node struct { + address string + weight uint32 + conn *grpc.ClientConn + } + + Pool struct { + log *zap.Logger + + ctl time.Duration + opts keepalive.ClientParameters + + cur *grpc.ClientConn + + *sync.RWMutex + nodes []*node + keys []uint32 + conns map[uint32][]*grpc.ClientConn + } +) + +var errNoHealthyConnections = errors.New("no active connections") + +func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { + p := &Pool{ + log: l, + RWMutex: new(sync.RWMutex), + keys: make([]uint32, 0), + nodes: make([]*node, 0), + conns: make(map[uint32][]*grpc.ClientConn), + + // fill with defaults: + ctl: time.Second * 15, + opts: keepalive.ClientParameters{ + Time: time.Second * 10, + Timeout: time.Minute * 5, + PermitWithoutStream: true, + }, + } + buf := make([]byte, 8) + if _, err := crand.Read(buf); err != nil { + l.Panic("could not read seed", zap.Error(err)) + } + + seed := binary.BigEndian.Uint64(buf) + rand.Seed(int64(seed)) + l.Info("used random seed", zap.Uint64("seed", seed)) + + if val := v.GetDuration("connect_timeout"); val > 0 { + p.ctl = val + } + + if val := v.GetDuration("keepalive.time"); val > 0 { + p.opts.Time = val + } + + if val := v.GetDuration("keepalive.timeout"); val > 0 { + p.opts.Timeout = val + } + + if v.IsSet("keepalive.permit_without_stream") { + p.opts.PermitWithoutStream = v.GetBool("keepalive.permit_without_stream") + } + + for i := 0; ; i++ { + key := "peers." + strconv.Itoa(i) + "." + address := v.GetString(key + "address") + weight := v.GetFloat64(key + "weight") + + if address == "" { + l.Warn("skip, empty address") + break + } + + p.nodes = append(p.nodes, &node{ + address: address, + weight: uint32(weight * 100), + }) + + l.Info("add new peer", + zap.String("address", p.nodes[i].address), + zap.Uint32("weight", p.nodes[i].weight)) + } + + p.reBalance(ctx) + + cur, err := p.getConnection() + if err != nil { + l.Panic("could get connection", zap.Error(err)) + } + + p.cur = cur + + return p +} + +func (p *Pool) close() { + p.Lock() + defer p.Unlock() + + for i := range p.nodes { + if p.nodes[i] == nil || p.nodes[i].conn == nil { + continue + } + + p.log.Warn("close connection", + zap.String("address", p.nodes[i].address), + zap.Error(p.nodes[i].conn.Close())) + } +} + +func (p *Pool) reBalance(ctx context.Context) { + p.Lock() + defer p.Unlock() + + keys := make(map[uint32]struct{}) + + for i := range p.nodes { + var ( + idx = -1 + exists bool + err error + conn = p.nodes[i].conn + weight = p.nodes[i].weight + ) + + if conn == nil { + p.log.Warn("empty connection, try to connect", + zap.String("address", p.nodes[i].address)) + + ctx, cancel := context.WithTimeout(ctx, p.ctl) + conn, err = grpc.DialContext(ctx, p.nodes[i].address, + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(p.opts)) + cancel() + + if err != nil || conn == nil { + p.log.Warn("skip, could not connect to node", + zap.String("address", p.nodes[i].address), + zap.Error(err)) + continue + } + + p.nodes[i].conn = conn + p.log.Info("connected to node", zap.String("address", p.nodes[i].address)) + } + + switch st := conn.GetState(); st { + case connectivity.Idle, connectivity.Ready, connectivity.Connecting: + keys[weight] = struct{}{} + + for j := range p.conns[weight] { + if p.conns[weight][j] == conn { + exists = true + break + } + } + + if !exists { + p.conns[weight] = append(p.conns[weight], conn) + } + + // Problems with connection, try to remove from : + default: + for j := range p.conns[weight] { + if p.conns[weight][j] == conn { + idx = j + exists = true + break + } + } + + if exists { + // remove from connections + p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) + } + } + } + + p.keys = p.keys[:0] + for w := range keys { + p.keys = append(p.keys, w) + } + + sort.Slice(p.keys, func(i, j int) bool { + return p.keys[i] > p.keys[j] + }) +} + +func (p *Pool) getConnection() (*grpc.ClientConn, error) { + p.RLock() + defer p.RUnlock() + + if p.cur != nil && isAlive(p.cur) { + return p.cur, nil + } + + for _, w := range p.keys { + switch ln := len(p.conns[w]); ln { + case 0: + continue + case 1: + p.cur = p.conns[w][0] + return p.cur, nil + default: // > 1 + i := rand.Intn(ln) + p.cur = p.conns[w][i] + return p.cur, nil + } + } + + return nil, errNoHealthyConnections +} + +func isAlive(cur *grpc.ClientConn) bool { + switch st := cur.GetState(); st { + case connectivity.Idle, connectivity.Ready, connectivity.Connecting: + return true + default: + return false + } +} diff --git a/receive.go b/receive.go index 57cb666..d59372a 100644 --- a/receive.go +++ b/receive.go @@ -16,33 +16,33 @@ import ( "go.uber.org/zap" ) -func (cfg *config) receiveFile(c echo.Context) error { +func (r *router) receiveFile(c echo.Context) error { var ( cid refs.CID oid refs.ObjectID obj *object.Object download = c.QueryParam("download") != "" + con, err = r.pool.getConnection() ) - cfg.log.Debug("try to fetch object from network", + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + + log := r.log.With( + zap.String("node", con.Target()), zap.String("cid", c.Param("cid")), zap.String("oid", c.Param("oid"))) if err := cid.Parse(c.Param("cid")); err != nil { - cfg.log.Error("wrong container id", - zap.String("cid", c.Param("cid")), - zap.String("oid", c.Param("oid")), - zap.Error(err)) + log.Error("wrong container id", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, errors.Wrap(err, "wrong container id").Error(), ) } else if err := oid.Parse(c.Param("oid")); err != nil { - cfg.log.Error("wrong object id", - zap.Stringer("cid", cid), - zap.String("oid", c.Param("oid")), - zap.Error(err)) + log.Error("wrong object id", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, @@ -50,39 +50,29 @@ func (cfg *config) receiveFile(c echo.Context) error { ) } - ctx, cancel := context.WithTimeout(context.Background(), cfg.timeout) + ctx, cancel := context.WithTimeout(context.Background(), r.timeout) defer cancel() req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}} req.SetTTL(service.SingleForwardingTTL) - if err := service.SignRequestHeader(cfg.key, req); err != nil { - cfg.log.Error("could not sign request", - zap.Stringer("cid", cid), - zap.Stringer("oid", oid), - zap.Error(err)) - + if err := service.SignRequestHeader(r.key, req); err != nil { + log.Error("could not sign request", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, errors.Wrap(err, "could not sign request").Error()) } - cli, err := cfg.cli.Get(ctx, req) + cli, err := object.NewServiceClient(con).Get(ctx, req) if err != nil { - cfg.log.Error("could not prepare connection", - zap.Stringer("cid", cid), - zap.Stringer("oid", oid), - zap.Error(err)) + log.Error("could not prepare connection", zap.Error(err)) return echo.NewHTTPError( http.StatusBadRequest, errors.Wrap(err, "could not prepare connection").Error(), ) } else if obj, err = receiveObject(cli); err != nil { - cfg.log.Error("could not receive object", - zap.Stringer("cid", cid), - zap.Stringer("oid", oid), - zap.Error(err)) + log.Error("could not receive object", zap.Error(err)) switch { case strings.Contains(err.Error(), object.ErrNotFound.Error()), @@ -96,9 +86,7 @@ func (cfg *config) receiveFile(c echo.Context) error { } } - cfg.log.Info("object fetched successfully", - zap.Stringer("cid", cid), - zap.Stringer("oid", oid)) + log.Info("object fetched successfully") c.Response().Header().Set("Content-Length", strconv.FormatUint(obj.SystemHeader.PayloadLength, 10)) c.Response().Header().Set("x-object-id", obj.SystemHeader.ID.String()) diff --git a/settings.go b/settings.go index 064013b..472850e 100644 --- a/settings.go +++ b/settings.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "time" @@ -61,9 +62,12 @@ func settings() *viper.Viper { v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // flags setup: - flags := pflag.NewFlagSet("comandline", pflag.ExitOnError) + flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags.SortFlags = false + flags.Bool("pprof", false, "enable pprof") + flags.Bool("metrics", false, "enable prometheus") + help := flags.BoolP("help", "h", false, "show help") version := flags.BoolP("version", "v", false, "show version") @@ -74,7 +78,7 @@ func settings() *viper.Viper { flags.Duration("connect_timeout", time.Second*30, "gRPC connect timeout") flags.String("listen_address", "0.0.0.0:8082", "HTTP Gateway listen address") - flags.String("neofs_address", "0.0.0.0:8080", "NeoFS Node address for proxying requests") + peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes") // set prefers: v.Set("app.name", "neofs-gw") @@ -117,5 +121,12 @@ func settings() *viper.Viper { os.Exit(0) } + if peers != nil && len(*peers) > 0 { + for i := range *peers { + v.SetDefault("peers."+strconv.Itoa(i)+".address", (*peers)[i]) + v.SetDefault("peers."+strconv.Itoa(i)+".weight", 1) + } + } + return v }