forked from TrueCloudLab/frostfs-s3-gw
[#25] Migrate connection pool to NeoFS API v2
closes #25 Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
65b5d6e3d2
commit
b346a77933
3 changed files with 86 additions and 123 deletions
|
@ -8,8 +8,9 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/service"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-api-go/state"
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -30,8 +31,8 @@ type (
|
||||||
|
|
||||||
Client interface {
|
Client interface {
|
||||||
Status() error
|
Status() error
|
||||||
GetConnection(context.Context) (*grpc.ClientConn, error)
|
Connection(context.Context) (*grpc.ClientConn, error)
|
||||||
SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error)
|
Token(context.Context, *grpc.ClientConn) (*token.SessionToken, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
Pool interface {
|
Pool interface {
|
||||||
|
@ -74,20 +75,27 @@ type (
|
||||||
currentIdx *atomic.Int32
|
currentIdx *atomic.Int32
|
||||||
currentConn *grpc.ClientConn
|
currentConn *grpc.ClientConn
|
||||||
|
|
||||||
reqHealth *state.HealthRequest
|
ping Pinger
|
||||||
|
|
||||||
*sync.Mutex
|
*sync.Mutex
|
||||||
nodes []*node
|
nodes []*node
|
||||||
keys []uint32
|
keys []uint32
|
||||||
conns map[uint32][]*node
|
conns map[uint32][]*node
|
||||||
key *ecdsa.PrivateKey
|
tokens map[string]*token.SessionToken
|
||||||
tokens map[string]*service.Token
|
|
||||||
|
oid *owner.ID
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
|
||||||
unhealthy *atomic.Error
|
unhealthy *atomic.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Pinger interface {
|
||||||
|
Call(context.Context, *grpc.ClientConn) error
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
errNoConnections = errors.New("no connections")
|
||||||
errBootstrapping = errors.New("bootstrapping")
|
errBootstrapping = errors.New("bootstrapping")
|
||||||
errEmptyConnection = errors.New("empty connection")
|
errEmptyConnection = errors.New("empty connection")
|
||||||
errNoHealthyConnections = errors.New("no active connections")
|
errNoHealthyConnections = errors.New("no active connections")
|
||||||
|
@ -101,7 +109,7 @@ func New(cfg *Config) (Pool, error) {
|
||||||
keys: make([]uint32, 0),
|
keys: make([]uint32, 0),
|
||||||
nodes: make([]*node, 0),
|
nodes: make([]*node, 0),
|
||||||
conns: make(map[uint32][]*node),
|
conns: make(map[uint32][]*node),
|
||||||
tokens: make(map[string]*service.Token),
|
tokens: make(map[string]*token.SessionToken),
|
||||||
|
|
||||||
currentIdx: atomic.NewInt32(-1),
|
currentIdx: atomic.NewInt32(-1),
|
||||||
|
|
||||||
|
@ -114,6 +122,14 @@ func New(cfg *Config) (Pool, error) {
|
||||||
unhealthy: atomic.NewError(errBootstrapping),
|
unhealthy: atomic.NewError(errBootstrapping),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.oid = owner.NewID()
|
||||||
|
wallet, err := owner.NEO3WalletFromPublicKey(&p.key.PublicKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.oid.SetNeo3Wallet(wallet)
|
||||||
|
|
||||||
if cfg.GRPCVerbose {
|
if cfg.GRPCVerbose {
|
||||||
grpclog.SetLoggerV2(cfg.GRPCLogger)
|
grpclog.SetLoggerV2(cfg.GRPCLogger)
|
||||||
}
|
}
|
||||||
|
@ -123,11 +139,8 @@ func New(cfg *Config) (Pool, error) {
|
||||||
rand.Seed(seed)
|
rand.Seed(seed)
|
||||||
cfg.Logger.Info("used random seed", zap.Int64("seed", seed))
|
cfg.Logger.Info("used random seed", zap.Int64("seed", seed))
|
||||||
|
|
||||||
p.reqHealth = new(state.HealthRequest)
|
if p.ping, err = newV2Ping(cfg.PrivateKey); err != nil {
|
||||||
p.reqHealth.SetTTL(service.NonForwardingTTL)
|
return nil, err
|
||||||
|
|
||||||
if err := service.SignRequestData(cfg.PrivateKey, p.reqHealth); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not sign `HealthRequest`")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range cfg.Peers {
|
for i := range cfg.Peers {
|
||||||
|
@ -147,6 +160,10 @@ func New(cfg *Config) (Pool, error) {
|
||||||
zap.Uint32("weight", p.nodes[i].weight))
|
zap.Uint32("weight", p.nodes[i].weight))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(p.nodes) == 0 {
|
||||||
|
return nil, errNoConnections
|
||||||
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,12 +191,12 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
defer func() {
|
defer func() {
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
_, err := p.GetConnection(ctx)
|
_, err := p.Connection(ctx)
|
||||||
p.unhealthy.Store(err)
|
p.unhealthy.Store(err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
keys := make(map[uint32]struct{})
|
keys := make(map[uint32]struct{})
|
||||||
tokens := make(map[string]*service.Token)
|
tokens := make(map[string]*token.SessionToken)
|
||||||
|
|
||||||
for i := range p.nodes {
|
for i := range p.nodes {
|
||||||
var (
|
var (
|
||||||
|
@ -187,7 +204,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
exists bool
|
exists bool
|
||||||
err error
|
err error
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
tkn *service.Token
|
tkn *token.SessionToken
|
||||||
conn = p.nodes[i].conn
|
conn = p.nodes[i].conn
|
||||||
weight = p.nodes[i].weight
|
weight = p.nodes[i].weight
|
||||||
)
|
)
|
||||||
|
@ -222,7 +239,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
|
|
||||||
{ // try to prepare token
|
{ // try to prepare token
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
||||||
tkn, err = generateToken(ctx, conn, p.key)
|
tkn, err = prepareToken(ctx, conn, p.key)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +257,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
||||||
} else if tkn, exists = p.tokens[conn.Target()]; exists {
|
} else if tkn, exists = p.tokens[conn.Target()]; exists {
|
||||||
// token exists, ignore
|
// token exists, ignore
|
||||||
} else if tkn, err = generateToken(ctx, conn, p.key); err != nil {
|
} else if tkn, err = prepareToken(ctx, conn, p.key); err != nil {
|
||||||
p.log.Error("could not prepare session token",
|
p.log.Error("could not prepare session token",
|
||||||
zap.String("address", p.nodes[i].address),
|
zap.String("address", p.nodes[i].address),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
@ -311,7 +328,7 @@ func (p *pool) ReBalance(ctx context.Context) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetConnection(ctx context.Context) (*grpc.ClientConn, error) {
|
func (p *pool) Connection(ctx context.Context) (*grpc.ClientConn, error) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
|
@ -361,16 +378,7 @@ func (p *pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error {
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
res, err := state.NewStatusClient(cur).HealthCheck(ctx, p.reqHealth)
|
return p.ping.Call(ctx, cur)
|
||||||
if err != nil {
|
|
||||||
p.log.Warn("could not fetch health-check", zap.Error(err))
|
|
||||||
|
|
||||||
return err
|
|
||||||
} else if !res.Healthy {
|
|
||||||
return errors.New(res.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
default:
|
default:
|
||||||
return errors.New(st.String())
|
return errors.New(st.String())
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,119 +5,38 @@ import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-api-go/service"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-api-go/session"
|
|
||||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
// SessionToken returns session token for connection
|
||||||
queryParams struct {
|
func (p *pool) Token(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) {
|
||||||
key *ecdsa.PrivateKey
|
|
||||||
addr refs.Address
|
|
||||||
verb service.Token_Info_Verb
|
|
||||||
}
|
|
||||||
|
|
||||||
SessionParams struct {
|
|
||||||
Addr refs.Address
|
|
||||||
Conn *grpc.ClientConn
|
|
||||||
Verb service.Token_Info_Verb
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) {
|
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
// if we had token for current connection - return it
|
if tkn, ok := p.tokens[conn.Target()]; ok && tkn != nil {
|
||||||
if tkn, ok := p.tokens[con.Target()]; ok {
|
|
||||||
return tkn, nil
|
return tkn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to generate token for connection
|
// prepare session token
|
||||||
tkn, err := generateToken(ctx, con, p.key)
|
tkn, err := prepareToken(ctx, conn, p.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.tokens[con.Target()] = tkn
|
// save token for current connection
|
||||||
|
p.tokens[conn.Target()] = tkn
|
||||||
|
|
||||||
return tkn, nil
|
return tkn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionToken returns session token for connection
|
|
||||||
func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
tkn *session.Token
|
|
||||||
)
|
|
||||||
|
|
||||||
if params.Conn == nil {
|
|
||||||
return nil, errors.New("empty connection")
|
|
||||||
} else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return prepareToken(tkn, queryParams{
|
|
||||||
key: p.key,
|
|
||||||
addr: params.Addr,
|
|
||||||
verb: params.Verb,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// creates token using
|
// creates token using
|
||||||
func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) {
|
func prepareToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*token.SessionToken, error) {
|
||||||
owner, err := refs.NewOwnerID(&key.PublicKey)
|
cli, err := client.New(key, client.WithGRPCConnection(con))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
token := new(service.Token)
|
return cli.CreateSession(ctx, math.MaxUint64)
|
||||||
token.SetOwnerID(owner)
|
|
||||||
token.SetExpirationEpoch(math.MaxUint64)
|
|
||||||
token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey))
|
|
||||||
|
|
||||||
creator, err := session.NewGRPCCreator(con, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := creator.Create(ctx, token)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
token.SetID(res.GetID())
|
|
||||||
token.SetSessionKey(res.GetSessionKey())
|
|
||||||
|
|
||||||
return token, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
|
|
||||||
sig := make([]byte, len(t.Signature))
|
|
||||||
copy(sig, t.Signature)
|
|
||||||
|
|
||||||
token := &service.Token{
|
|
||||||
Token_Info: service.Token_Info{
|
|
||||||
ID: t.ID,
|
|
||||||
OwnerID: t.OwnerID,
|
|
||||||
Verb: t.Verb,
|
|
||||||
Address: t.Address,
|
|
||||||
TokenLifetime: t.TokenLifetime,
|
|
||||||
SessionKey: t.SessionKey,
|
|
||||||
OwnerKey: t.OwnerKey,
|
|
||||||
},
|
|
||||||
Signature: sig,
|
|
||||||
}
|
|
||||||
|
|
||||||
token.SetAddress(p.addr)
|
|
||||||
token.SetVerb(p.verb)
|
|
||||||
|
|
||||||
err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return token, nil
|
|
||||||
}
|
}
|
||||||
|
|
36
api/pool/v2.go
Normal file
36
api/pool/v2.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/client"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type v2Ping struct {
|
||||||
|
req *netmap.LocalNodeInfoRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func newV2Ping(key *ecdsa.PrivateKey) (Pinger, error) {
|
||||||
|
req := new(netmap.LocalNodeInfoRequest)
|
||||||
|
|
||||||
|
if err := signature.SignServiceMessage(key, req); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not sign `PingRequest`")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &v2Ping{req: req}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *v2Ping) Call(ctx context.Context, conn *grpc.ClientConn) error {
|
||||||
|
if cli, err := netmap.NewClient(netmap.WithGlobalOpts(client.WithGRPCConn(conn))); err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := cli.LocalNodeInfo(ctx, v.req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue