From c1c8d56de5a489b7f3e878d16a5b907e560bbb00 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 24 Nov 2020 10:09:58 +0300 Subject: [PATCH] Migrate cmd/gate to new components - remove unused methods - refactoring app.Worker - migrate to new pool, client, auth and credentials Signed-off-by: Evgeniy Kulikov --- cmd/gate/app-settings.go | 65 ++---------------- cmd/gate/app.go | 144 +++++++++++++++------------------------ 2 files changed, 61 insertions(+), 148 deletions(-) diff --git a/cmd/gate/app-settings.go b/cmd/gate/app-settings.go index 5880ad2e..72c90c65 100644 --- a/cmd/gate/app-settings.go +++ b/cmd/gate/app-settings.go @@ -1,25 +1,15 @@ package main import ( - "context" - "crypto/ecdsa" - "crypto/elliptic" - "crypto/rand" "fmt" "io" - "io/ioutil" "os" "sort" "strconv" "strings" "time" - "github.com/nspcc-dev/neofs-authmate/accessbox/hcs" - crypto "github.com/nspcc-dev/neofs-crypto" - "github.com/nspcc-dev/neofs-s3-gate/api/pool" - "github.com/nspcc-dev/neofs-s3-gate/auth" "github.com/nspcc-dev/neofs-s3-gate/misc" - "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/spf13/viper" "go.uber.org/zap" @@ -113,51 +103,8 @@ var ignore = map[string]struct{}{ func (empty) Read([]byte) (int, error) { return 0, io.EOF } -func fetchGateAuthKeys(v *viper.Viper) (*hcs.X25519Keys, error) { - path := v.GetString(cfgGateAuthPrivateKey) - - data, err := ioutil.ReadFile(path) - if err != nil { - return nil, err - } - - return hcs.NewKeys(data) -} - -func fetchNeoFSKey(v *viper.Viper) (*ecdsa.PrivateKey, error) { - var ( - err error - key *ecdsa.PrivateKey - ) - - switch val := v.GetString(cfgNeoFSPrivateKey); val { - case generated: - key, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - if err != nil { - return nil, errors.Wrap(err, "could not generate NeoFS private key") - } - default: - key, err = crypto.LoadPrivateKey(val) - if err != nil { - return nil, errors.Wrap(err, "could not load NeoFS private key") - } - } - - return key, nil -} - -func fetchAuthCenter(ctx context.Context, p *authCenterParams) (*auth.Center, error) { - return auth.New(ctx, &auth.Params{ - Con: p.Pool, - Log: p.Logger, - Timeout: p.Timeout, - GAKey: p.GateAuthKeys, - NFKey: p.NeoFSPrivateKey, - }) -} - -func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer { - peers := make([]pool.Peer, 0) +func fetchPeers(l *zap.Logger, v *viper.Viper) map[string]float64 { + peers := make(map[string]float64, 0) for i := 0; ; i++ { @@ -170,10 +117,10 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer { break } - peers = append(peers, pool.Peer{ - Address: address, - Weight: weight, - }) + peers[address] = weight + l.Info("add connection peer", + zap.String("address", address), + zap.Float64("weight", weight)) } return peers diff --git a/cmd/gate/app.go b/cmd/gate/app.go index 27d31cc5..4dcc4485 100644 --- a/cmd/gate/app.go +++ b/cmd/gate/app.go @@ -2,39 +2,35 @@ package main import ( "context" - "crypto/ecdsa" "errors" "net" "net/http" "os" - "time" - "github.com/nspcc-dev/neofs-authmate/accessbox/hcs" + sdk "github.com/nspcc-dev/cdn-neofs-sdk" + "github.com/nspcc-dev/cdn-neofs-sdk/creds/hcs" + "github.com/nspcc-dev/cdn-neofs-sdk/creds/neofs" + "github.com/nspcc-dev/cdn-neofs-sdk/pool" "github.com/nspcc-dev/neofs-s3-gate/api" + "github.com/nspcc-dev/neofs-s3-gate/api/auth" "github.com/nspcc-dev/neofs-s3-gate/api/handler" "github.com/nspcc-dev/neofs-s3-gate/api/layer" - "github.com/nspcc-dev/neofs-s3-gate/api/pool" - "github.com/nspcc-dev/neofs-s3-gate/auth" "github.com/spf13/viper" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) type ( App struct { - cli pool.Pool - ctr *auth.Center + cli pool.Client + ctr auth.Center log *zap.Logger cfg *viper.Viper tls *tlsConfig obj layer.Client api api.Handler - conTimeout time.Duration - reqTimeout time.Duration - - reBalance time.Duration - maxClients api.MaxClients webDone chan struct{} @@ -50,14 +46,17 @@ type ( func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { var ( err error - cli pool.Pool tls *tlsConfig + cli sdk.Client + con pool.Client caller api.Handler - ctr *auth.Center + ctr auth.Center obj layer.Client - gaKey *hcs.X25519Keys - nfKey *ecdsa.PrivateKey + hcsCred hcs.Credentials + nfsCred neofs.Credentials + + peers = fetchPeers(l, v) reBalance = defaultRebalanceTimer conTimeout = defaultConnectTimeout @@ -65,6 +64,9 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { maxClientsCount = defaultMaxClientsCount maxClientsDeadline = defaultMaxClientsDeadline + + hcsCredential = v.GetString(cfgGateAuthPrivateKey) + nfsCredential = v.GetString(cfgNeoFSPrivateKey) ) if v := v.GetDuration(cfgConnectTimeout); v > 0 { @@ -87,11 +89,11 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { reBalance = v } - if nfKey, err = fetchNeoFSKey(v); err != nil { + if nfsCred, err = neofs.New(nfsCredential); err != nil { l.Fatal("could not load NeoFS private key") } - if gaKey, err = fetchGateAuthKeys(v); err != nil { + if hcsCred, err = hcs.NewCredentials(hcsCredential); err != nil { l.Fatal("could not load gate auth key") } @@ -102,54 +104,36 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { } } - peers := fetchPeers(l, v) + l.Info("using credentials", + zap.String("HCS", hcsCredential), + zap.String("NeoFS", nfsCredential)) - poolConfig := &pool.Config{ - ConnectTimeout: conTimeout, - RequestTimeout: reqTimeout, - ConnectionTTL: v.GetDuration(cfgConnectionTTL), + poolOptions := []pool.Option{ + pool.WithLogger(l), + pool.WithWeightPool(peers), + pool.WithCredentials(nfsCred), + pool.WithTickerTimeout(reBalance), + pool.WithConnectTimeout(conTimeout), + pool.WithRequestTimeout(reqTimeout), + pool.WithAPIPreparer(sdk.APIPreparer), + pool.WithGRPCOptions( + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: v.GetDuration(cfgKeepaliveTime), + Timeout: v.GetDuration(cfgKeepaliveTimeout), + PermitWithoutStream: v.GetBool(cfgKeepalivePermitWithoutStream), + }))} - Peers: peers, - - Logger: l, - PrivateKey: nfKey, - - GRPCLogger: gRPCLogger(l), - GRPCVerbose: v.GetBool(cfgGRPCVerbose), - - ClientParameters: keepalive.ClientParameters{}, - } - - if cli, err = pool.New(poolConfig); err != nil { + if con, err = pool.New(ctx, poolOptions...); err != nil { l.Fatal("could not prepare pool connections", zap.Error(err)) } - { // prepare auth center - ctx, cancel := context.WithTimeout(ctx, conTimeout) - defer cancel() - - params := &authCenterParams{ - Logger: l, - Pool: cli, - - Timeout: conTimeout, - - GateAuthKeys: gaKey, - NeoFSPrivateKey: nfKey, - } - - if ctr, err = fetchAuthCenter(ctx, params); err != nil { - l.Fatal("failed to initialize auth center", zap.Error(err)) - } - } - { // should establish connection with NeoFS Storage Nodes ctx, cancel := context.WithTimeout(ctx, conTimeout) defer cancel() - cli.ReBalance(ctx) - - if _, err = cli.Connection(ctx); err != nil { + if _, err = con.Connection(ctx); err != nil { if errors.Is(err, context.Canceled) { l.Info("connection canceled") os.Exit(0) @@ -160,16 +144,20 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { } } - layerParams := &layer.Params{ - Pool: cli, - Logger: l, - Timeout: reqTimeout, - NFKey: nfKey, + if cli, err = sdk.New(ctx, + sdk.WithLogger(l), + sdk.WithConnectionPool(con), + sdk.WithCredentials(nfsCred), + sdk.WithAPIPreparer(sdk.APIPreparer)); err != nil { + l.Fatal("could not prepare sdk client", + zap.Error(err)) } - if obj, err = layer.NewLayer(layerParams); err != nil { - l.Fatal("could not prepare ObjectLayer", zap.Error(err)) - } + // prepare object layer + obj = layer.NewLayer(l, cli) + + // prepare auth center + ctr = auth.New(cli, hcsCred.PrivateKey()) if caller, err = handler.New(l, obj); err != nil { l.Fatal("could not initialize API handler", zap.Error(err)) @@ -177,7 +165,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { return &App{ ctr: ctr, - cli: cli, + cli: con, log: l, cfg: v, obj: obj, @@ -187,12 +175,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), - reBalance: reBalance, - maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline), - - conTimeout: conTimeout, - reqTimeout: reqTimeout, } } @@ -271,24 +254,7 @@ func (a *App) Server(ctx context.Context) { } func (a *App) Worker(ctx context.Context) { - tick := time.NewTimer(a.reBalance) - -loop: - for { - select { - case <-ctx.Done(): - break loop - case <-tick.C: - ctx, cancel := context.WithTimeout(ctx, a.conTimeout) - a.cli.ReBalance(ctx) - cancel() - - tick.Reset(a.reBalance) - } - } - - tick.Stop() - a.cli.Close() + a.cli.Worker(ctx) a.log.Info("stopping worker") close(a.wrkDone) }