forked from TrueCloudLab/frostfs-node
dadfd90dcd
Initial public review release v0.10.0
455 lines
11 KiB
Go
455 lines
11 KiB
Go
package peers
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/multiformats/go-multiaddr"
|
|
manet "github.com/multiformats/go-multiaddr-net"
|
|
"github.com/nspcc-dev/neofs-node/lib/transport"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type (
|
|
// Interface is an interface of network connections controller.
|
|
Interface interface {
|
|
Shutdown() error
|
|
Job(context.Context)
|
|
Address() multiaddr.Multiaddr
|
|
RemoveConnection(maddr multiaddr.Multiaddr) error
|
|
Listen(maddr multiaddr.Multiaddr) (manet.Listener, error)
|
|
Connect(ctx context.Context, maddr multiaddr.Multiaddr) (manet.Conn, error)
|
|
GRPCConnector
|
|
}
|
|
|
|
// GRPCConnector is an interface of gRPC virtual connector.
|
|
GRPCConnector interface {
|
|
GRPCConnection(ctx context.Context, maddr multiaddr.Multiaddr, reset bool) (*grpc.ClientConn, error)
|
|
}
|
|
|
|
// Params groups the parameters of Interface.
|
|
Params struct {
|
|
Address multiaddr.Multiaddr
|
|
Transport transport.Transport
|
|
Logger *zap.Logger
|
|
Attempts int64
|
|
AttemptsTTL time.Duration
|
|
ConnectionTTL time.Duration
|
|
ConnectionIDLE time.Duration
|
|
MetricsTimeout time.Duration
|
|
KeepAliveTTL time.Duration
|
|
KeepAlivePingTTL time.Duration
|
|
}
|
|
|
|
connItem struct {
|
|
sync.RWMutex
|
|
conn *grpc.ClientConn
|
|
used time.Time
|
|
}
|
|
|
|
iface struct {
|
|
log *zap.Logger
|
|
addr multiaddr.Multiaddr // self address
|
|
tr transport.Transport
|
|
tick time.Duration
|
|
idle time.Duration
|
|
|
|
keepAlive time.Duration
|
|
pingTTL time.Duration
|
|
|
|
metricsTimeout time.Duration
|
|
|
|
grpc struct {
|
|
// globalMutex used by garbage collector and other high
|
|
globalMutex *sync.RWMutex
|
|
// bookMutex resolves concurrent access to the new connection
|
|
bookMutex *sync.RWMutex
|
|
// connBook contains connection info
|
|
// it's mutex resolves concurrent access to existed connection
|
|
connBook map[string]*connItem
|
|
}
|
|
|
|
cons struct {
|
|
*sync.RWMutex
|
|
items map[string]transport.Connection
|
|
}
|
|
|
|
lis struct {
|
|
*sync.RWMutex
|
|
items map[string]manet.Listener
|
|
}
|
|
}
|
|
)
|
|
|
|
const (
|
|
defaultAttemptsCount = 5
|
|
defaultAttemptsTTL = 30 * time.Second
|
|
defaultCloseTimer = 30 * time.Second
|
|
defaultConIdleTTL = 30 * time.Second
|
|
defaultKeepAliveTTL = 5 * time.Second
|
|
defaultMetricsTimeout = 5 * time.Second
|
|
defaultKeepAlivePingTTL = 50 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
// ErrDialToSelf is returned if we attempt to dial our own peer
|
|
ErrDialToSelf = errors.New("dial to self attempted")
|
|
// ErrEmptyAddress returns when you try to create Interface with empty address
|
|
ErrEmptyAddress = errors.New("self address could not be empty")
|
|
// ErrEmptyTransport returns when you try to create Interface with empty transport
|
|
ErrEmptyTransport = errors.New("transport could not be empty")
|
|
)
|
|
|
|
var errNilMultiaddr = errors.New("empty multi-address")
|
|
|
|
func (s *iface) Shutdown() error {
|
|
s.lis.Lock()
|
|
s.cons.Lock()
|
|
s.grpc.globalMutex.Lock()
|
|
|
|
defer func() {
|
|
s.lis.Unlock()
|
|
s.cons.Unlock()
|
|
s.grpc.globalMutex.Unlock()
|
|
}()
|
|
|
|
for addr := range s.cons.items {
|
|
if err := s.removeNetConnection(addr); err != nil {
|
|
return errors.Wrapf(err, "could not remove net connection `%s`", addr)
|
|
}
|
|
}
|
|
|
|
for addr := range s.grpc.connBook {
|
|
if err := s.removeGRPCConnection(addr); err != nil {
|
|
return errors.Wrapf(err, "could not remove net connection `%s`", addr)
|
|
}
|
|
}
|
|
|
|
for addr := range s.lis.items {
|
|
if err := s.removeListener(addr); err != nil {
|
|
return errors.Wrapf(err, "could not remove listener `%s`", addr)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoveConnection from Interface.
|
|
// Used only in tests, consider removing.
|
|
func (s *iface) RemoveConnection(maddr multiaddr.Multiaddr) error {
|
|
addr, err := convertAddress(maddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.cons.Lock()
|
|
s.grpc.globalMutex.Lock()
|
|
|
|
defer func() {
|
|
s.cons.Unlock()
|
|
s.grpc.globalMutex.Unlock()
|
|
}()
|
|
|
|
// Try to remove connection
|
|
if err := s.removeNetConnection(maddr.String()); err != nil {
|
|
return errors.Wrapf(err, "could not remove net connection `%s`", maddr.String())
|
|
}
|
|
|
|
// Try to remove gRPC connection
|
|
if err := s.removeGRPCConnection(addr); err != nil {
|
|
return errors.Wrapf(err, "could not remove gRPC connection `%s`", addr)
|
|
}
|
|
|
|
// TODO remove another connections
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *iface) removeListener(addr string) error {
|
|
if lis, ok := s.lis.items[addr]; ok {
|
|
if err := lis.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
delete(s.lis.items, addr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *iface) removeNetConnection(addr string) error {
|
|
// Try to remove simple connection
|
|
if con, ok := s.cons.items[addr]; ok {
|
|
if err := con.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
delete(s.cons.items, addr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *iface) removeGRPCConnection(addr string) error {
|
|
if gCon, ok := s.grpc.connBook[addr]; ok && gCon.conn != nil {
|
|
if err := gCon.conn.Close(); err != nil {
|
|
state, ok := status.FromError(err)
|
|
if !ok {
|
|
return err
|
|
}
|
|
|
|
s.log.Debug("error state",
|
|
zap.String("address", addr),
|
|
zap.Any("code", state.Code()),
|
|
zap.String("state", state.Message()),
|
|
zap.Any("details", state.Details()))
|
|
}
|
|
}
|
|
|
|
delete(s.grpc.connBook, addr)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Connect to address
|
|
// Used only in tests, consider removing.
|
|
func (s *iface) Connect(ctx context.Context, maddr multiaddr.Multiaddr) (manet.Conn, error) {
|
|
var (
|
|
err error
|
|
con transport.Connection
|
|
)
|
|
|
|
if maddr.Equal(s.addr) {
|
|
return nil, ErrDialToSelf
|
|
}
|
|
|
|
s.cons.RLock()
|
|
con, ok := s.cons.items[maddr.String()]
|
|
s.cons.RUnlock()
|
|
|
|
if ok && !con.Closed() {
|
|
return con, nil
|
|
}
|
|
|
|
if con, err = s.newConnection(ctx, maddr, false); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.cons.Lock()
|
|
s.cons.items[maddr.String()] = con
|
|
s.cons.Unlock()
|
|
|
|
return con, nil
|
|
}
|
|
|
|
// Listen try to find listener or creates new.
|
|
func (s *iface) Listen(maddr multiaddr.Multiaddr) (manet.Listener, error) {
|
|
// fixme: concurrency issue there, same as 5260f04d
|
|
// but it's not so bad, because `Listen()` used
|
|
// once during startup routine.
|
|
s.lis.RLock()
|
|
lis, ok := s.lis.items[maddr.String()]
|
|
s.lis.RUnlock()
|
|
|
|
if ok {
|
|
return lis, nil
|
|
}
|
|
|
|
lis, err := s.tr.Listen(maddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.lis.Lock()
|
|
s.lis.items[maddr.String()] = lis
|
|
s.lis.Unlock()
|
|
|
|
return lis, nil
|
|
}
|
|
|
|
// Address of current Interface instance.
|
|
func (s *iface) Address() multiaddr.Multiaddr {
|
|
return s.addr
|
|
}
|
|
|
|
func isGRPCClosed(con *grpc.ClientConn) bool {
|
|
switch con.GetState() {
|
|
case connectivity.Idle, connectivity.Connecting, connectivity.Ready:
|
|
return false
|
|
default:
|
|
// connectivity.TransientFailure, connectivity.Shutdown
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (s *iface) newConnection(ctx context.Context, addr multiaddr.Multiaddr, reset bool) (transport.Connection, error) {
|
|
return s.tr.Dial(ctx, addr, reset)
|
|
}
|
|
|
|
func gRPCKeepAlive(ping, ttl time.Duration) grpc.DialOption {
|
|
return grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: ping,
|
|
Timeout: ttl,
|
|
PermitWithoutStream: true,
|
|
})
|
|
}
|
|
|
|
func convertAddress(maddr multiaddr.Multiaddr) (string, error) {
|
|
if maddr == nil {
|
|
return "", errNilMultiaddr
|
|
}
|
|
|
|
addr, err := manet.ToNetAddr(maddr)
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "could not convert address `%s`", maddr)
|
|
}
|
|
|
|
return addr.String(), nil
|
|
}
|
|
|
|
// GRPCConnection creates gRPC connection over peers connection.
|
|
func (s *iface) GRPCConnection(ctx context.Context, maddr multiaddr.Multiaddr, reset bool) (*grpc.ClientConn, error) {
|
|
addr, err := convertAddress(maddr)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not convert `%v`", maddr)
|
|
}
|
|
|
|
// Get global mutex on read.
|
|
// All high level function e.g. peers garbage collector
|
|
// or shutdown must use globalMutex.Lock instead
|
|
s.grpc.globalMutex.RLock()
|
|
|
|
// Get connection item from connection book or create a new one.
|
|
// Concurrent map access resolved by bookMutex.
|
|
s.grpc.bookMutex.Lock()
|
|
|
|
item, ok := s.grpc.connBook[addr]
|
|
if !ok {
|
|
item = new(connItem)
|
|
s.grpc.connBook[addr] = item
|
|
}
|
|
|
|
s.grpc.bookMutex.Unlock()
|
|
|
|
// Now lock connection item.
|
|
// This denies concurrent access to the same address,
|
|
// but allows concurrent access to a different addresses.
|
|
item.Lock()
|
|
|
|
if item.conn != nil && !isGRPCClosed(item.conn) {
|
|
item.used = time.Now()
|
|
|
|
item.Unlock()
|
|
s.grpc.globalMutex.RUnlock()
|
|
|
|
return item.conn, nil
|
|
}
|
|
|
|
// Если вышеописанные строки переместить внутрь WithDialer,
|
|
// мы получим сломанный коннекшн, но ошибка не будет возвращена,
|
|
// поэтому мы сначала проверяем коннекшн и лишь потом возвращаем
|
|
// *gRPC.ClientConn
|
|
//
|
|
// Это будет работать с `grpc.WithBlock()`, см. ниже
|
|
conn, err := grpc.DialContext(ctx, maddr.String(),
|
|
gRPCKeepAlive(s.pingTTL, s.keepAlive),
|
|
// TODO: we must provide grpc.WithInsecure() or set credentials
|
|
grpc.WithInsecure(),
|
|
grpc.WithBlock(),
|
|
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
|
|
return s.newConnection(ctx, maddr, reset)
|
|
}),
|
|
)
|
|
if err == nil {
|
|
item.conn = conn
|
|
item.used = time.Now()
|
|
}
|
|
|
|
item.Unlock()
|
|
s.grpc.globalMutex.RUnlock()
|
|
|
|
return conn, err
|
|
}
|
|
|
|
// New create iface instance and check arguments.
|
|
func New(p Params) (Interface, error) {
|
|
if p.Address == nil {
|
|
return nil, ErrEmptyAddress
|
|
}
|
|
|
|
if p.Transport == nil {
|
|
return nil, ErrEmptyTransport
|
|
}
|
|
|
|
if p.Attempts <= 0 {
|
|
p.Attempts = defaultAttemptsCount
|
|
}
|
|
|
|
if p.AttemptsTTL <= 0 {
|
|
p.AttemptsTTL = defaultAttemptsTTL
|
|
}
|
|
|
|
if p.ConnectionTTL <= 0 {
|
|
p.ConnectionTTL = defaultCloseTimer
|
|
}
|
|
|
|
if p.ConnectionIDLE <= 0 {
|
|
p.ConnectionIDLE = defaultConIdleTTL
|
|
}
|
|
|
|
if p.KeepAliveTTL <= 0 {
|
|
p.KeepAliveTTL = defaultKeepAliveTTL
|
|
}
|
|
|
|
if p.KeepAlivePingTTL <= 0 {
|
|
p.KeepAlivePingTTL = defaultKeepAlivePingTTL
|
|
}
|
|
|
|
if p.MetricsTimeout <= 0 {
|
|
p.MetricsTimeout = defaultMetricsTimeout
|
|
}
|
|
|
|
return &iface{
|
|
tick: p.ConnectionTTL,
|
|
idle: p.ConnectionIDLE,
|
|
|
|
keepAlive: p.KeepAliveTTL,
|
|
pingTTL: p.KeepAlivePingTTL,
|
|
|
|
metricsTimeout: p.MetricsTimeout,
|
|
|
|
log: p.Logger,
|
|
addr: p.Address,
|
|
tr: p.Transport,
|
|
grpc: struct {
|
|
globalMutex *sync.RWMutex
|
|
bookMutex *sync.RWMutex
|
|
connBook map[string]*connItem
|
|
}{
|
|
globalMutex: new(sync.RWMutex),
|
|
bookMutex: new(sync.RWMutex),
|
|
connBook: make(map[string]*connItem),
|
|
},
|
|
cons: struct {
|
|
*sync.RWMutex
|
|
items map[string]transport.Connection
|
|
}{
|
|
RWMutex: new(sync.RWMutex),
|
|
items: make(map[string]transport.Connection),
|
|
},
|
|
lis: struct {
|
|
*sync.RWMutex
|
|
items map[string]manet.Listener
|
|
}{
|
|
RWMutex: new(sync.RWMutex),
|
|
items: make(map[string]manet.Listener),
|
|
},
|
|
}, nil
|
|
}
|