From b346a779338af8b4add79aca265eac38d6923b47 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 13 Oct 2020 12:32:23 +0300 Subject: [PATCH] [#25] Migrate connection pool to NeoFS API v2 closes #25 Signed-off-by: Evgeniy Kulikov --- api/pool/pool.go | 66 +++++++++++++++------------ api/pool/session.go | 107 ++++++-------------------------------------- api/pool/v2.go | 36 +++++++++++++++ 3 files changed, 86 insertions(+), 123 deletions(-) create mode 100644 api/pool/v2.go diff --git a/api/pool/pool.go b/api/pool/pool.go index b9374268f..091604cdf 100644 --- a/api/pool/pool.go +++ b/api/pool/pool.go @@ -8,8 +8,9 @@ import ( "sync" "time" - "github.com/nspcc-dev/neofs-api-go/service" - "github.com/nspcc-dev/neofs-api-go/state" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + + "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/pkg/errors" "go.uber.org/atomic" "go.uber.org/zap" @@ -30,8 +31,8 @@ type ( Client interface { Status() error - GetConnection(context.Context) (*grpc.ClientConn, error) - SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) + Connection(context.Context) (*grpc.ClientConn, error) + Token(context.Context, *grpc.ClientConn) (*token.SessionToken, error) } Pool interface { @@ -74,20 +75,27 @@ type ( currentIdx *atomic.Int32 currentConn *grpc.ClientConn - reqHealth *state.HealthRequest + ping Pinger *sync.Mutex nodes []*node keys []uint32 conns map[uint32][]*node - key *ecdsa.PrivateKey - tokens map[string]*service.Token + tokens map[string]*token.SessionToken + + oid *owner.ID + key *ecdsa.PrivateKey unhealthy *atomic.Error } + + Pinger interface { + Call(context.Context, *grpc.ClientConn) error + } ) var ( + errNoConnections = errors.New("no connections") errBootstrapping = errors.New("bootstrapping") errEmptyConnection = errors.New("empty connection") errNoHealthyConnections = errors.New("no active connections") @@ -101,7 +109,7 @@ func New(cfg *Config) (Pool, error) { keys: make([]uint32, 0), nodes: make([]*node, 0), conns: make(map[uint32][]*node), - tokens: make(map[string]*service.Token), + tokens: make(map[string]*token.SessionToken), currentIdx: atomic.NewInt32(-1), @@ -114,6 +122,14 @@ func New(cfg *Config) (Pool, error) { unhealthy: atomic.NewError(errBootstrapping), } + p.oid = owner.NewID() + wallet, err := owner.NEO3WalletFromPublicKey(&p.key.PublicKey) + if err != nil { + return nil, err + } + + p.oid.SetNeo3Wallet(wallet) + if cfg.GRPCVerbose { grpclog.SetLoggerV2(cfg.GRPCLogger) } @@ -123,11 +139,8 @@ func New(cfg *Config) (Pool, error) { rand.Seed(seed) cfg.Logger.Info("used random seed", zap.Int64("seed", seed)) - p.reqHealth = new(state.HealthRequest) - p.reqHealth.SetTTL(service.NonForwardingTTL) - - if err := service.SignRequestData(cfg.PrivateKey, p.reqHealth); err != nil { - return nil, errors.Wrap(err, "could not sign `HealthRequest`") + if p.ping, err = newV2Ping(cfg.PrivateKey); err != nil { + return nil, err } for i := range cfg.Peers { @@ -147,6 +160,10 @@ func New(cfg *Config) (Pool, error) { zap.Uint32("weight", p.nodes[i].weight)) } + if len(p.nodes) == 0 { + return nil, errNoConnections + } + return p, nil } @@ -174,12 +191,12 @@ func (p *pool) ReBalance(ctx context.Context) { defer func() { p.Unlock() - _, err := p.GetConnection(ctx) + _, err := p.Connection(ctx) p.unhealthy.Store(err) }() keys := make(map[uint32]struct{}) - tokens := make(map[string]*service.Token) + tokens := make(map[string]*token.SessionToken) for i := range p.nodes { var ( @@ -187,7 +204,7 @@ func (p *pool) ReBalance(ctx context.Context) { exists bool err error start = time.Now() - tkn *service.Token + tkn *token.SessionToken conn = p.nodes[i].conn weight = p.nodes[i].weight ) @@ -222,7 +239,7 @@ func (p *pool) ReBalance(ctx context.Context) { { // try to prepare token ctx, cancel := context.WithTimeout(ctx, p.reqTimeout) - tkn, err = generateToken(ctx, conn, p.key) + tkn, err = prepareToken(ctx, conn, p.key) cancel() } @@ -240,7 +257,7 @@ func (p *pool) ReBalance(ctx context.Context) { p.log.Debug("connected to node", zap.String("address", p.nodes[i].address)) } else if tkn, exists = p.tokens[conn.Target()]; exists { // token exists, ignore - } else if tkn, err = generateToken(ctx, conn, p.key); err != nil { + } else if tkn, err = prepareToken(ctx, conn, p.key); err != nil { p.log.Error("could not prepare session token", zap.String("address", p.nodes[i].address), zap.Error(err)) @@ -311,7 +328,7 @@ func (p *pool) ReBalance(ctx context.Context) { }) } -func (p *pool) GetConnection(ctx context.Context) (*grpc.ClientConn, error) { +func (p *pool) Connection(ctx context.Context) (*grpc.ClientConn, error) { p.Lock() defer p.Unlock() @@ -361,16 +378,7 @@ func (p *pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error { ctx, cancel := context.WithTimeout(ctx, p.reqTimeout) defer cancel() - res, err := state.NewStatusClient(cur).HealthCheck(ctx, p.reqHealth) - if err != nil { - p.log.Warn("could not fetch health-check", zap.Error(err)) - - return err - } else if !res.Healthy { - return errors.New(res.Status) - } - - return nil + return p.ping.Call(ctx, cur) default: return errors.New(st.String()) } diff --git a/api/pool/session.go b/api/pool/session.go index a4fc38423..ba6594ef6 100644 --- a/api/pool/session.go +++ b/api/pool/session.go @@ -5,119 +5,38 @@ import ( "crypto/ecdsa" "math" - "github.com/nspcc-dev/neofs-api-go/refs" - "github.com/nspcc-dev/neofs-api-go/service" - "github.com/nspcc-dev/neofs-api-go/session" - crypto "github.com/nspcc-dev/neofs-crypto" - "github.com/pkg/errors" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/token" "google.golang.org/grpc" ) -type ( - queryParams struct { - key *ecdsa.PrivateKey - addr refs.Address - verb service.Token_Info_Verb - } - - SessionParams struct { - Addr refs.Address - Conn *grpc.ClientConn - Verb service.Token_Info_Verb - } -) - -func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) { +// SessionToken returns session token for connection +func (p *pool) Token(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) { p.Lock() defer p.Unlock() - // if we had token for current connection - return it - if tkn, ok := p.tokens[con.Target()]; ok { + if tkn, ok := p.tokens[conn.Target()]; ok && tkn != nil { return tkn, nil } - // try to generate token for connection - tkn, err := generateToken(ctx, con, p.key) + // prepare session token + tkn, err := prepareToken(ctx, conn, p.key) if err != nil { return nil, err } - p.tokens[con.Target()] = tkn + // save token for current connection + p.tokens[conn.Target()] = tkn + return tkn, nil } -// SessionToken returns session token for connection -func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) { - var ( - err error - tkn *session.Token - ) - - if params.Conn == nil { - return nil, errors.New("empty connection") - } else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil { - return nil, err - } - - return prepareToken(tkn, queryParams{ - key: p.key, - addr: params.Addr, - verb: params.Verb, - }) -} - // creates token using -func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) { - owner, err := refs.NewOwnerID(&key.PublicKey) +func prepareToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*token.SessionToken, error) { + cli, err := client.New(key, client.WithGRPCConnection(con)) if err != nil { return nil, err } - token := new(service.Token) - token.SetOwnerID(owner) - token.SetExpirationEpoch(math.MaxUint64) - token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey)) - - creator, err := session.NewGRPCCreator(con, key) - if err != nil { - return nil, err - } - - res, err := creator.Create(ctx, token) - if err != nil { - return nil, err - } - - token.SetID(res.GetID()) - token.SetSessionKey(res.GetSessionKey()) - - return token, nil -} - -func prepareToken(t *service.Token, p queryParams) (*service.Token, error) { - sig := make([]byte, len(t.Signature)) - copy(sig, t.Signature) - - token := &service.Token{ - Token_Info: service.Token_Info{ - ID: t.ID, - OwnerID: t.OwnerID, - Verb: t.Verb, - Address: t.Address, - TokenLifetime: t.TokenLifetime, - SessionKey: t.SessionKey, - OwnerKey: t.OwnerKey, - }, - Signature: sig, - } - - token.SetAddress(p.addr) - token.SetVerb(p.verb) - - err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token)) - if err != nil { - return nil, err - } - - return token, nil + return cli.CreateSession(ctx, math.MaxUint64) } diff --git a/api/pool/v2.go b/api/pool/v2.go new file mode 100644 index 000000000..8610bd46b --- /dev/null +++ b/api/pool/v2.go @@ -0,0 +1,36 @@ +package pool + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/v2/client" + "github.com/nspcc-dev/neofs-api-go/v2/netmap" + "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +type v2Ping struct { + req *netmap.LocalNodeInfoRequest +} + +func newV2Ping(key *ecdsa.PrivateKey) (Pinger, error) { + req := new(netmap.LocalNodeInfoRequest) + + if err := signature.SignServiceMessage(key, req); err != nil { + return nil, errors.Wrap(err, "could not sign `PingRequest`") + } + + return &v2Ping{req: req}, nil +} + +func (v *v2Ping) Call(ctx context.Context, conn *grpc.ClientConn) error { + if cli, err := netmap.NewClient(netmap.WithGlobalOpts(client.WithGRPCConn(conn))); err != nil { + return err + } else if _, err := cli.LocalNodeInfo(ctx, v.req); err != nil { + return err + } + + return nil +}