Migrate cmd/gate to new components

- remove unused methods
- refactoring app.Worker
- migrate to new pool, client, auth and credentials

Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
Evgeniy Kulikov 2020-11-24 10:09:58 +03:00
parent 3bf6a847a2
commit c1c8d56de5
2 changed files with 61 additions and 148 deletions

View file

@ -1,25 +1,15 @@
package main package main
import ( import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/nspcc-dev/neofs-authmate/accessbox/hcs"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-s3-gate/api/pool"
"github.com/nspcc-dev/neofs-s3-gate/auth"
"github.com/nspcc-dev/neofs-s3-gate/misc" "github.com/nspcc-dev/neofs-s3-gate/misc"
"github.com/pkg/errors"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
@ -113,51 +103,8 @@ var ignore = map[string]struct{}{
func (empty) Read([]byte) (int, error) { return 0, io.EOF } func (empty) Read([]byte) (int, error) { return 0, io.EOF }
func fetchGateAuthKeys(v *viper.Viper) (*hcs.X25519Keys, error) { func fetchPeers(l *zap.Logger, v *viper.Viper) map[string]float64 {
path := v.GetString(cfgGateAuthPrivateKey) peers := make(map[string]float64, 0)
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
return hcs.NewKeys(data)
}
func fetchNeoFSKey(v *viper.Viper) (*ecdsa.PrivateKey, error) {
var (
err error
key *ecdsa.PrivateKey
)
switch val := v.GetString(cfgNeoFSPrivateKey); val {
case generated:
key, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, errors.Wrap(err, "could not generate NeoFS private key")
}
default:
key, err = crypto.LoadPrivateKey(val)
if err != nil {
return nil, errors.Wrap(err, "could not load NeoFS private key")
}
}
return key, nil
}
func fetchAuthCenter(ctx context.Context, p *authCenterParams) (*auth.Center, error) {
return auth.New(ctx, &auth.Params{
Con: p.Pool,
Log: p.Logger,
Timeout: p.Timeout,
GAKey: p.GateAuthKeys,
NFKey: p.NeoFSPrivateKey,
})
}
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer {
peers := make([]pool.Peer, 0)
for i := 0; ; i++ { for i := 0; ; i++ {
@ -170,10 +117,10 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer {
break break
} }
peers = append(peers, pool.Peer{ peers[address] = weight
Address: address, l.Info("add connection peer",
Weight: weight, zap.String("address", address),
}) zap.Float64("weight", weight))
} }
return peers return peers

View file

@ -2,39 +2,35 @@ package main
import ( import (
"context" "context"
"crypto/ecdsa"
"errors" "errors"
"net" "net"
"net/http" "net/http"
"os" "os"
"time"
"github.com/nspcc-dev/neofs-authmate/accessbox/hcs" sdk "github.com/nspcc-dev/cdn-neofs-sdk"
"github.com/nspcc-dev/cdn-neofs-sdk/creds/hcs"
"github.com/nspcc-dev/cdn-neofs-sdk/creds/neofs"
"github.com/nspcc-dev/cdn-neofs-sdk/pool"
"github.com/nspcc-dev/neofs-s3-gate/api" "github.com/nspcc-dev/neofs-s3-gate/api"
"github.com/nspcc-dev/neofs-s3-gate/api/auth"
"github.com/nspcc-dev/neofs-s3-gate/api/handler" "github.com/nspcc-dev/neofs-s3-gate/api/handler"
"github.com/nspcc-dev/neofs-s3-gate/api/layer" "github.com/nspcc-dev/neofs-s3-gate/api/layer"
"github.com/nspcc-dev/neofs-s3-gate/api/pool"
"github.com/nspcc-dev/neofs-s3-gate/auth"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
) )
type ( type (
App struct { App struct {
cli pool.Pool cli pool.Client
ctr *auth.Center ctr auth.Center
log *zap.Logger log *zap.Logger
cfg *viper.Viper cfg *viper.Viper
tls *tlsConfig tls *tlsConfig
obj layer.Client obj layer.Client
api api.Handler api api.Handler
conTimeout time.Duration
reqTimeout time.Duration
reBalance time.Duration
maxClients api.MaxClients maxClients api.MaxClients
webDone chan struct{} webDone chan struct{}
@ -50,14 +46,17 @@ type (
func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
var ( var (
err error err error
cli pool.Pool
tls *tlsConfig tls *tlsConfig
cli sdk.Client
con pool.Client
caller api.Handler caller api.Handler
ctr *auth.Center ctr auth.Center
obj layer.Client obj layer.Client
gaKey *hcs.X25519Keys hcsCred hcs.Credentials
nfKey *ecdsa.PrivateKey nfsCred neofs.Credentials
peers = fetchPeers(l, v)
reBalance = defaultRebalanceTimer reBalance = defaultRebalanceTimer
conTimeout = defaultConnectTimeout conTimeout = defaultConnectTimeout
@ -65,6 +64,9 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
maxClientsCount = defaultMaxClientsCount maxClientsCount = defaultMaxClientsCount
maxClientsDeadline = defaultMaxClientsDeadline maxClientsDeadline = defaultMaxClientsDeadline
hcsCredential = v.GetString(cfgGateAuthPrivateKey)
nfsCredential = v.GetString(cfgNeoFSPrivateKey)
) )
if v := v.GetDuration(cfgConnectTimeout); v > 0 { if v := v.GetDuration(cfgConnectTimeout); v > 0 {
@ -87,11 +89,11 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
reBalance = v reBalance = v
} }
if nfKey, err = fetchNeoFSKey(v); err != nil { if nfsCred, err = neofs.New(nfsCredential); err != nil {
l.Fatal("could not load NeoFS private key") l.Fatal("could not load NeoFS private key")
} }
if gaKey, err = fetchGateAuthKeys(v); err != nil { if hcsCred, err = hcs.NewCredentials(hcsCredential); err != nil {
l.Fatal("could not load gate auth key") l.Fatal("could not load gate auth key")
} }
@ -102,54 +104,36 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
} }
} }
peers := fetchPeers(l, v) l.Info("using credentials",
zap.String("HCS", hcsCredential),
zap.String("NeoFS", nfsCredential))
poolConfig := &pool.Config{ poolOptions := []pool.Option{
ConnectTimeout: conTimeout, pool.WithLogger(l),
RequestTimeout: reqTimeout, pool.WithWeightPool(peers),
ConnectionTTL: v.GetDuration(cfgConnectionTTL), pool.WithCredentials(nfsCred),
pool.WithTickerTimeout(reBalance),
pool.WithConnectTimeout(conTimeout),
pool.WithRequestTimeout(reqTimeout),
pool.WithAPIPreparer(sdk.APIPreparer),
pool.WithGRPCOptions(
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: v.GetDuration(cfgKeepaliveTime),
Timeout: v.GetDuration(cfgKeepaliveTimeout),
PermitWithoutStream: v.GetBool(cfgKeepalivePermitWithoutStream),
}))}
Peers: peers, if con, err = pool.New(ctx, poolOptions...); err != nil {
Logger: l,
PrivateKey: nfKey,
GRPCLogger: gRPCLogger(l),
GRPCVerbose: v.GetBool(cfgGRPCVerbose),
ClientParameters: keepalive.ClientParameters{},
}
if cli, err = pool.New(poolConfig); err != nil {
l.Fatal("could not prepare pool connections", zap.Error(err)) l.Fatal("could not prepare pool connections", zap.Error(err))
} }
{ // prepare auth center
ctx, cancel := context.WithTimeout(ctx, conTimeout)
defer cancel()
params := &authCenterParams{
Logger: l,
Pool: cli,
Timeout: conTimeout,
GateAuthKeys: gaKey,
NeoFSPrivateKey: nfKey,
}
if ctr, err = fetchAuthCenter(ctx, params); err != nil {
l.Fatal("failed to initialize auth center", zap.Error(err))
}
}
{ // should establish connection with NeoFS Storage Nodes { // should establish connection with NeoFS Storage Nodes
ctx, cancel := context.WithTimeout(ctx, conTimeout) ctx, cancel := context.WithTimeout(ctx, conTimeout)
defer cancel() defer cancel()
cli.ReBalance(ctx) if _, err = con.Connection(ctx); err != nil {
if _, err = cli.Connection(ctx); err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
l.Info("connection canceled") l.Info("connection canceled")
os.Exit(0) os.Exit(0)
@ -160,16 +144,20 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
} }
} }
layerParams := &layer.Params{ if cli, err = sdk.New(ctx,
Pool: cli, sdk.WithLogger(l),
Logger: l, sdk.WithConnectionPool(con),
Timeout: reqTimeout, sdk.WithCredentials(nfsCred),
NFKey: nfKey, sdk.WithAPIPreparer(sdk.APIPreparer)); err != nil {
l.Fatal("could not prepare sdk client",
zap.Error(err))
} }
if obj, err = layer.NewLayer(layerParams); err != nil { // prepare object layer
l.Fatal("could not prepare ObjectLayer", zap.Error(err)) obj = layer.NewLayer(l, cli)
}
// prepare auth center
ctr = auth.New(cli, hcsCred.PrivateKey())
if caller, err = handler.New(l, obj); err != nil { if caller, err = handler.New(l, obj); err != nil {
l.Fatal("could not initialize API handler", zap.Error(err)) l.Fatal("could not initialize API handler", zap.Error(err))
@ -177,7 +165,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
return &App{ return &App{
ctr: ctr, ctr: ctr,
cli: cli, cli: con,
log: l, log: l,
cfg: v, cfg: v,
obj: obj, obj: obj,
@ -187,12 +175,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
webDone: make(chan struct{}, 1), webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1),
reBalance: reBalance,
maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline), maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline),
conTimeout: conTimeout,
reqTimeout: reqTimeout,
} }
} }
@ -271,24 +254,7 @@ func (a *App) Server(ctx context.Context) {
} }
func (a *App) Worker(ctx context.Context) { func (a *App) Worker(ctx context.Context) {
tick := time.NewTimer(a.reBalance) a.cli.Worker(ctx)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-tick.C:
ctx, cancel := context.WithTimeout(ctx, a.conTimeout)
a.cli.ReBalance(ctx)
cancel()
tick.Reset(a.reBalance)
}
}
tick.Stop()
a.cli.Close()
a.log.Info("stopping worker") a.log.Info("stopping worker")
close(a.wrkDone) close(a.wrkDone)
} }