Merge pull request #4 from nspcc-dev/NFSSVC-27

NFSSVC-27 Add session token workaround to connection pool
remotes/KirillovDenis/bugfix/681-fix_acl_parsing
Evgeniy Kulikov 2020-07-29 14:37:37 +03:00 committed by GitHub
commit b64c986b1c
2 changed files with 176 additions and 16 deletions

View File

@ -41,6 +41,7 @@ type (
Close() Close()
Status() error Status() error
ReBalance(ctx context.Context) ReBalance(ctx context.Context)
SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error)
} }
Peer struct { Peer struct {
@ -79,9 +80,11 @@ type (
reqHealth *state.HealthRequest reqHealth *state.HealthRequest
*sync.Mutex *sync.Mutex
nodes []*node nodes []*node
keys []uint32 keys []uint32
conns map[uint32][]*node conns map[uint32][]*node
key *ecdsa.PrivateKey
tokens map[string]*service.Token
unhealthy *atomic.Error unhealthy *atomic.Error
} }
@ -95,11 +98,13 @@ var (
func New(cfg *Config) (Pool, error) { func New(cfg *Config) (Pool, error) {
p := &pool{ p := &pool{
log: cfg.Logger, log: cfg.Logger,
Mutex: new(sync.Mutex), key: cfg.PrivateKey,
keys: make([]uint32, 0), Mutex: new(sync.Mutex),
nodes: make([]*node, 0), keys: make([]uint32, 0),
conns: make(map[uint32][]*node), nodes: make([]*node, 0),
conns: make(map[uint32][]*node),
tokens: make(map[string]*service.Token),
currentIdx: atomic.NewInt32(-1), currentIdx: atomic.NewInt32(-1),
@ -181,8 +186,7 @@ func (p *pool) ReBalance(ctx context.Context) {
}() }()
keys := make(map[uint32]struct{}) keys := make(map[uint32]struct{})
tokens := make(map[string]*service.Token)
p.log.Debug("re-balancing connections")
for i := range p.nodes { for i := range p.nodes {
var ( var (
@ -190,6 +194,7 @@ func (p *pool) ReBalance(ctx context.Context) {
exists bool exists bool
err error err error
start = time.Now() start = time.Now()
tkn *service.Token
conn = p.nodes[i].conn conn = p.nodes[i].conn
weight = p.nodes[i].weight weight = p.nodes[i].weight
) )
@ -205,12 +210,14 @@ func (p *pool) ReBalance(ctx context.Context) {
p.log.Debug("empty connection, try to connect", p.log.Debug("empty connection, try to connect",
zap.String("address", p.nodes[i].address)) zap.String("address", p.nodes[i].address))
ctx, cancel := context.WithTimeout(ctx, p.conTimeout) { // try to connect
conn, err = grpc.DialContext(ctx, p.nodes[i].address, ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
grpc.WithBlock(), conn, err = grpc.DialContext(ctx, p.nodes[i].address,
grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithKeepaliveParams(p.opts)) grpc.WithInsecure(),
cancel() grpc.WithKeepaliveParams(p.opts))
cancel()
}
if err != nil || conn == nil { if err != nil || conn == nil {
p.log.Warn("skip, could not connect to node", p.log.Warn("skip, could not connect to node",
@ -220,9 +227,31 @@ func (p *pool) ReBalance(ctx context.Context) {
continue continue
} }
{ // try to prepare token
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
tkn, err = generateToken(ctx, conn, p.key)
cancel()
}
if err != nil {
p.log.Debug("could not prepare session token",
zap.String("address", p.nodes[i].address),
zap.Error(err))
continue
}
tokens[conn.Target()] = tkn
p.nodes[i].conn = conn p.nodes[i].conn = conn
p.nodes[i].usedAt = time.Now() p.nodes[i].usedAt = time.Now()
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address)) p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
} else if tkn, exists = p.tokens[conn.Target()]; exists {
// token exists, ignore
} else if tkn, err = generateToken(ctx, conn, p.key); err != nil {
p.log.Error("could not prepare session token",
zap.String("address", p.nodes[i].address),
zap.Error(err))
continue
} }
for j := range p.conns[weight] { for j := range p.conns[weight] {
@ -247,6 +276,9 @@ func (p *pool) ReBalance(ctx context.Context) {
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
} }
// remove token
delete(tokens, conn.Target())
if err = conn.Close(); err != nil { if err = conn.Close(); err != nil {
p.log.Warn("could not close bad connection", p.log.Warn("could not close bad connection",
zap.String("address", p.nodes[i].address), zap.String("address", p.nodes[i].address),
@ -269,8 +301,13 @@ func (p *pool) ReBalance(ctx context.Context) {
if !exists { if !exists {
p.conns[weight] = append(p.conns[weight], p.nodes[i]) p.conns[weight] = append(p.conns[weight], p.nodes[i])
} }
if tkn != nil {
tokens[conn.Target()] = tkn
}
} }
p.tokens = tokens
p.keys = p.keys[:0] p.keys = p.keys[:0]
for w := range keys { for w := range keys {
p.keys = append(p.keys, w) p.keys = append(p.keys, w)

View File

@ -0,0 +1,123 @@
package pool
import (
"context"
"crypto/ecdsa"
"math"
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/nspcc-dev/neofs-api-go/service"
"github.com/nspcc-dev/neofs-api-go/session"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type (
queryParams struct {
key *ecdsa.PrivateKey
addr refs.Address
verb service.Token_Info_Verb
}
SessionParams struct {
Addr refs.Address
Conn *grpc.ClientConn
Verb service.Token_Info_Verb
}
)
func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) {
p.Lock()
defer p.Unlock()
// if we had token for current connection - return it
if tkn, ok := p.tokens[con.Target()]; ok {
return tkn, nil
}
// try to generate token for connection
tkn, err := generateToken(ctx, con, p.key)
if err != nil {
return nil, err
}
p.tokens[con.Target()] = tkn
return tkn, nil
}
// SessionToken returns session token for connection
func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) {
var (
err error
tkn *session.Token
)
if params.Conn == nil {
return nil, errors.New("empty connection")
} else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil {
return nil, err
}
return prepareToken(tkn, queryParams{
key: p.key,
addr: params.Addr,
verb: params.Verb,
})
}
// creates token using
func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) {
owner, err := refs.NewOwnerID(&key.PublicKey)
if err != nil {
return nil, err
}
token := new(service.Token)
token.SetOwnerID(owner)
token.SetExpirationEpoch(math.MaxUint64)
token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey))
creator, err := session.NewGRPCCreator(con, key)
if err != nil {
return nil, err
}
res, err := creator.Create(ctx, token)
if err != nil {
return nil, err
}
token.SetID(res.GetID())
token.SetSessionKey(res.GetSessionKey())
return token, nil
}
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
sig := make([]byte, len(t.Signature))
copy(sig, t.Signature)
token := &service.Token{
Token_Info: service.Token_Info{
ID: t.ID,
OwnerID: t.OwnerID,
Verb: t.Verb,
Address: t.Address,
TokenLifetime: t.TokenLifetime,
SessionKey: t.SessionKey,
OwnerKey: t.OwnerKey,
},
Signature: sig,
}
token.SetAddress(p.addr)
token.SetVerb(p.verb)
err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token))
if err != nil {
return nil, err
}
return token, nil
}