From a200ae327a2cdca01af836f46b13219f1778263c Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Wed, 29 Jul 2020 14:29:03 +0300 Subject: [PATCH] NFSSVC-27 Add session token workaround to connection pool - Renew token for connections - Pool.SessionToken for connection and options Signed-off-by: Evgeniy Kulikov --- neofs/pool/pool.go | 69 ++++++++++++++++++------ neofs/pool/session.go | 123 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 16 deletions(-) create mode 100644 neofs/pool/session.go diff --git a/neofs/pool/pool.go b/neofs/pool/pool.go index b6d2a0ff..cbd65910 100644 --- a/neofs/pool/pool.go +++ b/neofs/pool/pool.go @@ -41,6 +41,7 @@ type ( Close() Status() error ReBalance(ctx context.Context) + SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) } Peer struct { @@ -79,9 +80,11 @@ type ( reqHealth *state.HealthRequest *sync.Mutex - nodes []*node - keys []uint32 - conns map[uint32][]*node + nodes []*node + keys []uint32 + conns map[uint32][]*node + key *ecdsa.PrivateKey + tokens map[string]*service.Token unhealthy *atomic.Error } @@ -95,11 +98,13 @@ var ( func New(cfg *Config) (Pool, error) { p := &pool{ - log: cfg.Logger, - Mutex: new(sync.Mutex), - keys: make([]uint32, 0), - nodes: make([]*node, 0), - conns: make(map[uint32][]*node), + log: cfg.Logger, + key: cfg.PrivateKey, + Mutex: new(sync.Mutex), + keys: make([]uint32, 0), + nodes: make([]*node, 0), + conns: make(map[uint32][]*node), + tokens: make(map[string]*service.Token), currentIdx: atomic.NewInt32(-1), @@ -181,8 +186,7 @@ func (p *pool) ReBalance(ctx context.Context) { }() keys := make(map[uint32]struct{}) - - p.log.Debug("re-balancing connections") + tokens := make(map[string]*service.Token) for i := range p.nodes { var ( @@ -190,6 +194,7 @@ func (p *pool) ReBalance(ctx context.Context) { exists bool err error start = time.Now() + tkn *service.Token conn = p.nodes[i].conn weight = p.nodes[i].weight ) @@ -205,12 +210,14 @@ func (p *pool) ReBalance(ctx context.Context) { p.log.Debug("empty connection, try to connect", zap.String("address", p.nodes[i].address)) - ctx, cancel := context.WithTimeout(ctx, p.conTimeout) - conn, err = grpc.DialContext(ctx, p.nodes[i].address, - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithKeepaliveParams(p.opts)) - cancel() + { // try to connect + ctx, cancel := context.WithTimeout(ctx, p.conTimeout) + conn, err = grpc.DialContext(ctx, p.nodes[i].address, + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(p.opts)) + cancel() + } if err != nil || conn == nil { p.log.Warn("skip, could not connect to node", @@ -220,9 +227,31 @@ func (p *pool) ReBalance(ctx context.Context) { continue } + { // try to prepare token + ctx, cancel := context.WithTimeout(ctx, p.reqTimeout) + tkn, err = generateToken(ctx, conn, p.key) + cancel() + } + + if err != nil { + p.log.Debug("could not prepare session token", + zap.String("address", p.nodes[i].address), + zap.Error(err)) + continue + } + + tokens[conn.Target()] = tkn + p.nodes[i].conn = conn p.nodes[i].usedAt = time.Now() p.log.Debug("connected to node", zap.String("address", p.nodes[i].address)) + } else if tkn, exists = p.tokens[conn.Target()]; exists { + // token exists, ignore + } else if tkn, err = generateToken(ctx, conn, p.key); err != nil { + p.log.Error("could not prepare session token", + zap.String("address", p.nodes[i].address), + zap.Error(err)) + continue } for j := range p.conns[weight] { @@ -247,6 +276,9 @@ func (p *pool) ReBalance(ctx context.Context) { p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) } + // remove token + delete(tokens, conn.Target()) + if err = conn.Close(); err != nil { p.log.Warn("could not close bad connection", zap.String("address", p.nodes[i].address), @@ -269,8 +301,13 @@ func (p *pool) ReBalance(ctx context.Context) { if !exists { p.conns[weight] = append(p.conns[weight], p.nodes[i]) } + + if tkn != nil { + tokens[conn.Target()] = tkn + } } + p.tokens = tokens p.keys = p.keys[:0] for w := range keys { p.keys = append(p.keys, w) diff --git a/neofs/pool/session.go b/neofs/pool/session.go new file mode 100644 index 00000000..a4fc3842 --- /dev/null +++ b/neofs/pool/session.go @@ -0,0 +1,123 @@ +package pool + +import ( + "context" + "crypto/ecdsa" + "math" + + "github.com/nspcc-dev/neofs-api-go/refs" + "github.com/nspcc-dev/neofs-api-go/service" + "github.com/nspcc-dev/neofs-api-go/session" + crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +type ( + queryParams struct { + key *ecdsa.PrivateKey + addr refs.Address + verb service.Token_Info_Verb + } + + SessionParams struct { + Addr refs.Address + Conn *grpc.ClientConn + Verb service.Token_Info_Verb + } +) + +func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) { + p.Lock() + defer p.Unlock() + + // if we had token for current connection - return it + if tkn, ok := p.tokens[con.Target()]; ok { + return tkn, nil + } + + // try to generate token for connection + tkn, err := generateToken(ctx, con, p.key) + if err != nil { + return nil, err + } + + p.tokens[con.Target()] = tkn + return tkn, nil +} + +// SessionToken returns session token for connection +func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) { + var ( + err error + tkn *session.Token + ) + + if params.Conn == nil { + return nil, errors.New("empty connection") + } else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil { + return nil, err + } + + return prepareToken(tkn, queryParams{ + key: p.key, + addr: params.Addr, + verb: params.Verb, + }) +} + +// creates token using +func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) { + owner, err := refs.NewOwnerID(&key.PublicKey) + if err != nil { + return nil, err + } + + token := new(service.Token) + token.SetOwnerID(owner) + token.SetExpirationEpoch(math.MaxUint64) + token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey)) + + creator, err := session.NewGRPCCreator(con, key) + if err != nil { + return nil, err + } + + res, err := creator.Create(ctx, token) + if err != nil { + return nil, err + } + + token.SetID(res.GetID()) + token.SetSessionKey(res.GetSessionKey()) + + return token, nil +} + +func prepareToken(t *service.Token, p queryParams) (*service.Token, error) { + sig := make([]byte, len(t.Signature)) + copy(sig, t.Signature) + + token := &service.Token{ + Token_Info: service.Token_Info{ + ID: t.ID, + OwnerID: t.OwnerID, + Verb: t.Verb, + Address: t.Address, + TokenLifetime: t.TokenLifetime, + SessionKey: t.SessionKey, + OwnerKey: t.OwnerKey, + }, + Signature: sig, + } + + token.SetAddress(p.addr) + token.SetVerb(p.verb) + + err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token)) + if err != nil { + return nil, err + } + + return token, nil +}