forked from TrueCloudLab/frostfs-s3-gw
NFSSVC-27 Add session token workaround to connection pool
- Renew token for connections - Pool.SessionToken for connection and options Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
55dacf20c1
commit
a200ae327a
2 changed files with 176 additions and 16 deletions
|
@ -41,6 +41,7 @@ type (
|
||||||
Close()
|
Close()
|
||||||
Status() error
|
Status() error
|
||||||
ReBalance(ctx context.Context)
|
ReBalance(ctx context.Context)
|
||||||
|
SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
Peer struct {
|
Peer struct {
|
||||||
|
@ -82,6 +83,8 @@ type (
|
||||||
nodes []*node
|
nodes []*node
|
||||||
keys []uint32
|
keys []uint32
|
||||||
conns map[uint32][]*node
|
conns map[uint32][]*node
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
tokens map[string]*service.Token
|
||||||
|
|
||||||
unhealthy *atomic.Error
|
unhealthy *atomic.Error
|
||||||
}
|
}
|
||||||
|
@ -96,10 +99,12 @@ var (
|
||||||
func New(cfg *Config) (Pool, error) {
|
func New(cfg *Config) (Pool, error) {
|
||||||
p := &pool{
|
p := &pool{
|
||||||
log: cfg.Logger,
|
log: cfg.Logger,
|
||||||
|
key: cfg.PrivateKey,
|
||||||
Mutex: new(sync.Mutex),
|
Mutex: new(sync.Mutex),
|
||||||
keys: make([]uint32, 0),
|
keys: make([]uint32, 0),
|
||||||
nodes: make([]*node, 0),
|
nodes: make([]*node, 0),
|
||||||
conns: make(map[uint32][]*node),
|
conns: make(map[uint32][]*node),
|
||||||
|
tokens: make(map[string]*service.Token),
|
||||||
|
|
||||||
currentIdx: atomic.NewInt32(-1),
|
currentIdx: atomic.NewInt32(-1),
|
||||||
|
|
||||||
|
@ -181,8 +186,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
keys := make(map[uint32]struct{})
|
keys := make(map[uint32]struct{})
|
||||||
|
tokens := make(map[string]*service.Token)
|
||||||
p.log.Debug("re-balancing connections")
|
|
||||||
|
|
||||||
for i := range p.nodes {
|
for i := range p.nodes {
|
||||||
var (
|
var (
|
||||||
|
@ -190,6 +194,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
exists bool
|
exists bool
|
||||||
err error
|
err error
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
tkn *service.Token
|
||||||
conn = p.nodes[i].conn
|
conn = p.nodes[i].conn
|
||||||
weight = p.nodes[i].weight
|
weight = p.nodes[i].weight
|
||||||
)
|
)
|
||||||
|
@ -205,12 +210,14 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
p.log.Debug("empty connection, try to connect",
|
p.log.Debug("empty connection, try to connect",
|
||||||
zap.String("address", p.nodes[i].address))
|
zap.String("address", p.nodes[i].address))
|
||||||
|
|
||||||
|
{ // try to connect
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
|
ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
|
||||||
conn, err = grpc.DialContext(ctx, p.nodes[i].address,
|
conn, err = grpc.DialContext(ctx, p.nodes[i].address,
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
grpc.WithInsecure(),
|
grpc.WithInsecure(),
|
||||||
grpc.WithKeepaliveParams(p.opts))
|
grpc.WithKeepaliveParams(p.opts))
|
||||||
cancel()
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil || conn == nil {
|
if err != nil || conn == nil {
|
||||||
p.log.Warn("skip, could not connect to node",
|
p.log.Warn("skip, could not connect to node",
|
||||||
|
@ -220,9 +227,31 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{ // try to prepare token
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
||||||
|
tkn, err = generateToken(ctx, conn, p.key)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
p.log.Debug("could not prepare session token",
|
||||||
|
zap.String("address", p.nodes[i].address),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tokens[conn.Target()] = tkn
|
||||||
|
|
||||||
p.nodes[i].conn = conn
|
p.nodes[i].conn = conn
|
||||||
p.nodes[i].usedAt = time.Now()
|
p.nodes[i].usedAt = time.Now()
|
||||||
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
||||||
|
} else if tkn, exists = p.tokens[conn.Target()]; exists {
|
||||||
|
// token exists, ignore
|
||||||
|
} else if tkn, err = generateToken(ctx, conn, p.key); err != nil {
|
||||||
|
p.log.Error("could not prepare session token",
|
||||||
|
zap.String("address", p.nodes[i].address),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for j := range p.conns[weight] {
|
for j := range p.conns[weight] {
|
||||||
|
@ -247,6 +276,9 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
|
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove token
|
||||||
|
delete(tokens, conn.Target())
|
||||||
|
|
||||||
if err = conn.Close(); err != nil {
|
if err = conn.Close(); err != nil {
|
||||||
p.log.Warn("could not close bad connection",
|
p.log.Warn("could not close bad connection",
|
||||||
zap.String("address", p.nodes[i].address),
|
zap.String("address", p.nodes[i].address),
|
||||||
|
@ -269,8 +301,13 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
if !exists {
|
if !exists {
|
||||||
p.conns[weight] = append(p.conns[weight], p.nodes[i])
|
p.conns[weight] = append(p.conns[weight], p.nodes[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tkn != nil {
|
||||||
|
tokens[conn.Target()] = tkn
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.tokens = tokens
|
||||||
p.keys = p.keys[:0]
|
p.keys = p.keys[:0]
|
||||||
for w := range keys {
|
for w := range keys {
|
||||||
p.keys = append(p.keys, w)
|
p.keys = append(p.keys, w)
|
||||||
|
|
123
neofs/pool/session.go
Normal file
123
neofs/pool/session.go
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/service"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/session"
|
||||||
|
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
queryParams struct {
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
addr refs.Address
|
||||||
|
verb service.Token_Info_Verb
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionParams struct {
|
||||||
|
Addr refs.Address
|
||||||
|
Conn *grpc.ClientConn
|
||||||
|
Verb service.Token_Info_Verb
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
// if we had token for current connection - return it
|
||||||
|
if tkn, ok := p.tokens[con.Target()]; ok {
|
||||||
|
return tkn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to generate token for connection
|
||||||
|
tkn, err := generateToken(ctx, con, p.key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.tokens[con.Target()] = tkn
|
||||||
|
return tkn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionToken returns session token for connection
|
||||||
|
func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
tkn *session.Token
|
||||||
|
)
|
||||||
|
|
||||||
|
if params.Conn == nil {
|
||||||
|
return nil, errors.New("empty connection")
|
||||||
|
} else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return prepareToken(tkn, queryParams{
|
||||||
|
key: p.key,
|
||||||
|
addr: params.Addr,
|
||||||
|
verb: params.Verb,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// creates token using
|
||||||
|
func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) {
|
||||||
|
owner, err := refs.NewOwnerID(&key.PublicKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token := new(service.Token)
|
||||||
|
token.SetOwnerID(owner)
|
||||||
|
token.SetExpirationEpoch(math.MaxUint64)
|
||||||
|
token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey))
|
||||||
|
|
||||||
|
creator, err := session.NewGRPCCreator(con, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := creator.Create(ctx, token)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token.SetID(res.GetID())
|
||||||
|
token.SetSessionKey(res.GetSessionKey())
|
||||||
|
|
||||||
|
return token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
|
||||||
|
sig := make([]byte, len(t.Signature))
|
||||||
|
copy(sig, t.Signature)
|
||||||
|
|
||||||
|
token := &service.Token{
|
||||||
|
Token_Info: service.Token_Info{
|
||||||
|
ID: t.ID,
|
||||||
|
OwnerID: t.OwnerID,
|
||||||
|
Verb: t.Verb,
|
||||||
|
Address: t.Address,
|
||||||
|
TokenLifetime: t.TokenLifetime,
|
||||||
|
SessionKey: t.SessionKey,
|
||||||
|
OwnerKey: t.OwnerKey,
|
||||||
|
},
|
||||||
|
Signature: sig,
|
||||||
|
}
|
||||||
|
|
||||||
|
token.SetAddress(p.addr)
|
||||||
|
token.SetVerb(p.verb)
|
||||||
|
|
||||||
|
err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return token, nil
|
||||||
|
}
|
Loading…
Reference in a new issue