forked from TrueCloudLab/frostfs-s3-gw
Remove pool package
It will be replaced with sdk/pool Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
b9972042fc
commit
1fecf6a7ac
3 changed files with 0 additions and 472 deletions
384
api/pool/pool.go
384
api/pool/pool.go
|
@ -1,384 +0,0 @@
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"math/rand"
|
|
||||||
"sort"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/connectivity"
|
|
||||||
"google.golang.org/grpc/grpclog"
|
|
||||||
"google.golang.org/grpc/keepalive"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
node struct {
|
|
||||||
index int32
|
|
||||||
address string
|
|
||||||
weight uint32
|
|
||||||
usedAt time.Time
|
|
||||||
conn *grpc.ClientConn
|
|
||||||
}
|
|
||||||
|
|
||||||
Client interface {
|
|
||||||
Status() error
|
|
||||||
Connection(context.Context) (*grpc.ClientConn, error)
|
|
||||||
Token(context.Context, *grpc.ClientConn) (*token.SessionToken, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
Pool interface {
|
|
||||||
Client
|
|
||||||
|
|
||||||
Close()
|
|
||||||
ReBalance(ctx context.Context)
|
|
||||||
}
|
|
||||||
|
|
||||||
Peer struct {
|
|
||||||
Address string
|
|
||||||
Weight float64
|
|
||||||
}
|
|
||||||
|
|
||||||
Config struct {
|
|
||||||
keepalive.ClientParameters
|
|
||||||
|
|
||||||
ConnectionTTL time.Duration
|
|
||||||
ConnectTimeout time.Duration
|
|
||||||
RequestTimeout time.Duration
|
|
||||||
|
|
||||||
Peers []Peer
|
|
||||||
|
|
||||||
GRPCVerbose bool
|
|
||||||
GRPCLogger grpclog.LoggerV2
|
|
||||||
|
|
||||||
Logger *zap.Logger
|
|
||||||
PrivateKey *ecdsa.PrivateKey
|
|
||||||
}
|
|
||||||
|
|
||||||
pool struct {
|
|
||||||
log *zap.Logger
|
|
||||||
|
|
||||||
ttl time.Duration
|
|
||||||
|
|
||||||
conTimeout time.Duration
|
|
||||||
reqTimeout time.Duration
|
|
||||||
opts keepalive.ClientParameters
|
|
||||||
|
|
||||||
currentIdx *atomic.Int32
|
|
||||||
currentConn *grpc.ClientConn
|
|
||||||
|
|
||||||
ping Pinger
|
|
||||||
|
|
||||||
*sync.Mutex
|
|
||||||
nodes []*node
|
|
||||||
keys []uint32
|
|
||||||
conns map[uint32][]*node
|
|
||||||
tokens map[string]*token.SessionToken
|
|
||||||
|
|
||||||
oid *owner.ID
|
|
||||||
key *ecdsa.PrivateKey
|
|
||||||
|
|
||||||
unhealthy *atomic.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
Pinger interface {
|
|
||||||
Call(context.Context, *grpc.ClientConn) error
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errNoConnections = errors.New("no connections")
|
|
||||||
errBootstrapping = errors.New("bootstrapping")
|
|
||||||
errEmptyConnection = errors.New("empty connection")
|
|
||||||
errNoHealthyConnections = errors.New("no active connections")
|
|
||||||
)
|
|
||||||
|
|
||||||
func New(cfg *Config) (Pool, error) {
|
|
||||||
p := &pool{
|
|
||||||
log: cfg.Logger,
|
|
||||||
key: cfg.PrivateKey,
|
|
||||||
Mutex: new(sync.Mutex),
|
|
||||||
keys: make([]uint32, 0),
|
|
||||||
nodes: make([]*node, 0),
|
|
||||||
conns: make(map[uint32][]*node),
|
|
||||||
tokens: make(map[string]*token.SessionToken),
|
|
||||||
|
|
||||||
currentIdx: atomic.NewInt32(-1),
|
|
||||||
|
|
||||||
ttl: cfg.ConnectionTTL,
|
|
||||||
|
|
||||||
conTimeout: cfg.ConnectTimeout,
|
|
||||||
reqTimeout: cfg.RequestTimeout,
|
|
||||||
opts: cfg.ClientParameters,
|
|
||||||
|
|
||||||
unhealthy: atomic.NewError(errBootstrapping),
|
|
||||||
}
|
|
||||||
|
|
||||||
p.oid = owner.NewID()
|
|
||||||
wallet, err := owner.NEO3WalletFromPublicKey(&p.key.PublicKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.oid.SetNeo3Wallet(wallet)
|
|
||||||
|
|
||||||
if cfg.GRPCVerbose {
|
|
||||||
grpclog.SetLoggerV2(cfg.GRPCLogger)
|
|
||||||
}
|
|
||||||
|
|
||||||
seed := time.Now().UnixNano()
|
|
||||||
|
|
||||||
rand.Seed(seed)
|
|
||||||
cfg.Logger.Info("used random seed", zap.Int64("seed", seed))
|
|
||||||
|
|
||||||
if p.ping, err = newV2Ping(cfg.PrivateKey); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range cfg.Peers {
|
|
||||||
if cfg.Peers[i].Address == "" {
|
|
||||||
cfg.Logger.Warn("skip, empty address")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
p.nodes = append(p.nodes, &node{
|
|
||||||
index: int32(i),
|
|
||||||
address: cfg.Peers[i].Address,
|
|
||||||
weight: uint32(cfg.Peers[i].Weight * 100),
|
|
||||||
})
|
|
||||||
|
|
||||||
cfg.Logger.Info("add new peer",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Uint32("weight", p.nodes[i].weight))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(p.nodes) == 0 {
|
|
||||||
return nil, errNoConnections
|
|
||||||
}
|
|
||||||
|
|
||||||
return p, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Status() error {
|
|
||||||
return p.unhealthy.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Close() {
|
|
||||||
p.Lock()
|
|
||||||
defer p.Unlock()
|
|
||||||
|
|
||||||
for i := range p.nodes {
|
|
||||||
if p.nodes[i] == nil || p.nodes[i].conn == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
p.log.Warn("close connection",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Error(p.nodes[i].conn.Close()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) ReBalance(ctx context.Context) {
|
|
||||||
p.Lock()
|
|
||||||
defer func() {
|
|
||||||
p.Unlock()
|
|
||||||
|
|
||||||
_, err := p.Connection(ctx)
|
|
||||||
p.unhealthy.Store(err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
keys := make(map[uint32]struct{})
|
|
||||||
tokens := make(map[string]*token.SessionToken)
|
|
||||||
|
|
||||||
for i := range p.nodes {
|
|
||||||
var (
|
|
||||||
idx = -1
|
|
||||||
exists bool
|
|
||||||
err error
|
|
||||||
start = time.Now()
|
|
||||||
tkn *token.SessionToken
|
|
||||||
conn = p.nodes[i].conn
|
|
||||||
weight = p.nodes[i].weight
|
|
||||||
)
|
|
||||||
|
|
||||||
if err = ctx.Err(); err != nil {
|
|
||||||
p.log.Warn("something went wrong", zap.Error(err))
|
|
||||||
p.unhealthy.Store(err)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if conn == nil {
|
|
||||||
p.log.Debug("empty connection, try to connect",
|
|
||||||
zap.String("address", p.nodes[i].address))
|
|
||||||
|
|
||||||
{ // try to connect
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
|
|
||||||
conn, err = grpc.DialContext(ctx, p.nodes[i].address,
|
|
||||||
grpc.WithBlock(),
|
|
||||||
grpc.WithInsecure(),
|
|
||||||
grpc.WithKeepaliveParams(p.opts))
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil || conn == nil {
|
|
||||||
p.log.Warn("skip, could not connect to node",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Stringer("elapsed", time.Since(start)),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
{ // try to prepare token
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
|
||||||
tkn, err = p.prepareToken(ctx, conn)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
p.log.Debug("could not prepare session token",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tokens[conn.Target()] = tkn
|
|
||||||
|
|
||||||
p.nodes[i].conn = conn
|
|
||||||
p.nodes[i].usedAt = time.Now()
|
|
||||||
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
|
||||||
} else if tkn, exists = p.tokens[conn.Target()]; exists {
|
|
||||||
// token exists, ignore
|
|
||||||
} else if tkn, err = p.prepareToken(ctx, conn); err != nil {
|
|
||||||
p.log.Error("could not prepare session token",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := range p.conns[weight] {
|
|
||||||
if p.conns[weight][j] != nil && p.conns[weight][j].conn == conn {
|
|
||||||
idx = j
|
|
||||||
exists = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
usedAt := time.Since(p.nodes[i].usedAt)
|
|
||||||
|
|
||||||
// if something wrong with connection (bad state, unhealthy or not used a long time), try to close it and remove
|
|
||||||
if err = p.isAlive(ctx, conn); err != nil || usedAt > p.ttl {
|
|
||||||
p.log.Warn("connection not alive",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Stringer("since", usedAt),
|
|
||||||
zap.Error(err))
|
|
||||||
|
|
||||||
if exists {
|
|
||||||
// remove from connections
|
|
||||||
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove token
|
|
||||||
delete(tokens, conn.Target())
|
|
||||||
|
|
||||||
if err = conn.Close(); err != nil {
|
|
||||||
p.log.Warn("could not close bad connection",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Stringer("since", usedAt),
|
|
||||||
zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.nodes[i].conn != nil {
|
|
||||||
p.nodes[i].conn = nil
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
keys[weight] = struct{}{}
|
|
||||||
|
|
||||||
p.log.Debug("connection alive",
|
|
||||||
zap.String("address", p.nodes[i].address),
|
|
||||||
zap.Stringer("since", usedAt))
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
p.conns[weight] = append(p.conns[weight], p.nodes[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
if tkn != nil {
|
|
||||||
tokens[conn.Target()] = tkn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.tokens = tokens
|
|
||||||
p.keys = p.keys[:0]
|
|
||||||
for w := range keys {
|
|
||||||
p.keys = append(p.keys, w)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(p.keys, func(i, j int) bool {
|
|
||||||
return p.keys[i] > p.keys[j]
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Connection(ctx context.Context) (*grpc.ClientConn, error) {
|
|
||||||
p.Lock()
|
|
||||||
defer p.Unlock()
|
|
||||||
|
|
||||||
if err := p.isAlive(ctx, p.currentConn); err == nil {
|
|
||||||
if id := p.currentIdx.Load(); id != -1 && p.nodes[id] != nil {
|
|
||||||
p.nodes[id].usedAt = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
return p.currentConn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, w := range p.keys {
|
|
||||||
switch ln := len(p.conns[w]); ln {
|
|
||||||
case 0:
|
|
||||||
continue
|
|
||||||
case 1:
|
|
||||||
p.currentConn = p.conns[w][0].conn
|
|
||||||
p.conns[w][0].usedAt = time.Now()
|
|
||||||
p.currentIdx.Store(p.conns[w][0].index)
|
|
||||||
return p.currentConn, nil
|
|
||||||
default: // > 1
|
|
||||||
i := rand.Intn(ln)
|
|
||||||
p.currentConn = p.conns[w][i].conn
|
|
||||||
p.conns[w][i].usedAt = time.Now()
|
|
||||||
p.currentIdx.Store(p.conns[w][i].index)
|
|
||||||
return p.currentConn, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.currentConn = nil
|
|
||||||
p.currentIdx.Store(-1)
|
|
||||||
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, errNoHealthyConnections
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error {
|
|
||||||
if cur == nil {
|
|
||||||
return errEmptyConnection
|
|
||||||
}
|
|
||||||
|
|
||||||
switch st := cur.GetState(); st {
|
|
||||||
case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
return p.ping.Call(ctx, cur)
|
|
||||||
default:
|
|
||||||
return errors.New(st.String())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,52 +0,0 @@
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SessionToken returns session token for connection
|
|
||||||
func (p *pool) Token(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) {
|
|
||||||
p.Lock()
|
|
||||||
defer p.Unlock()
|
|
||||||
|
|
||||||
if tkn, ok := p.tokens[conn.Target()]; ok && tkn != nil {
|
|
||||||
return tkn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare session token
|
|
||||||
tkn, err := p.prepareToken(ctx, conn)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// save token for current connection
|
|
||||||
p.tokens[conn.Target()] = tkn
|
|
||||||
|
|
||||||
return tkn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// creates token using
|
|
||||||
func (p *pool) prepareToken(ctx context.Context, conn *grpc.ClientConn) (*token.SessionToken, error) {
|
|
||||||
cli, err := client.New(p.key, client.WithGRPCConnection(conn))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
tkn, err := cli.CreateSession(ctx, math.MaxUint64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.log.Info("token created for connection",
|
|
||||||
zap.String("address", conn.Target()),
|
|
||||||
zap.Stringer("owner", tkn.OwnerID()))
|
|
||||||
|
|
||||||
return tkn, err
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/client"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type v2Ping struct {
|
|
||||||
req *netmap.LocalNodeInfoRequest
|
|
||||||
}
|
|
||||||
|
|
||||||
func newV2Ping(key *ecdsa.PrivateKey) (Pinger, error) {
|
|
||||||
req := new(netmap.LocalNodeInfoRequest)
|
|
||||||
|
|
||||||
if err := signature.SignServiceMessage(key, req); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not sign `PingRequest`")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &v2Ping{req: req}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *v2Ping) Call(ctx context.Context, conn *grpc.ClientConn) error {
|
|
||||||
if cli, err := netmap.NewClient(netmap.WithGlobalOpts(client.WithGRPCConn(conn))); err != nil {
|
|
||||||
return err
|
|
||||||
} else if _, err := cli.LocalNodeInfo(ctx, v.req); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue