frostfs-node/pkg/morph/subscriber/subscriber.go

353 lines
8.4 KiB
Go
Raw Normal View History

package subscriber
import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
type (
NotificationChannels struct {
BlockCh <-chan *block.Block
NotificationsCh <-chan *state.ContainedNotificationEvent
NotaryRequestsCh <-chan *result.NotaryRequestEvent
}
// Subscriber is an interface of the NotificationEvent listener.
Subscriber interface {
SubscribeForNotification(...util.Uint160) error
BlockNotifications() error
SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
NotificationChannels() NotificationChannels
Close()
}
subChannels struct {
NotifyChan chan *state.ContainedNotificationEvent
BlockChan chan *block.Block
NotaryChan chan *result.NotaryRequestEvent
}
subscriber struct {
sync.RWMutex
log *logger.Logger
client *client.Client
notifyChan chan *state.ContainedNotificationEvent
blockChan chan *block.Block
notaryChan chan *result.NotaryRequestEvent
current subChannels
// cached subscription information
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
}
// Params is a group of Subscriber constructor parameters.
Params struct {
Log *logger.Logger
StartFromBlock uint32
Client *client.Client
}
)
func (s *subscriber) NotificationChannels() NotificationChannels {
return NotificationChannels{
BlockCh: s.blockChan,
NotificationsCh: s.notifyChan,
NotaryRequestsCh: s.notaryChan,
}
}
var (
errNilParams = errors.New("chain/subscriber: config was not provided to the constructor")
errNilLogger = errors.New("chain/subscriber: logger was not provided to the constructor")
errNilClient = errors.New("chain/subscriber: client was not provided to the constructor")
)
func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock()
defer s.Unlock()
notifyIDs := make([]string, 0, len(contracts))
for i := range contracts {
if s.subscribedEvents[contracts[i]] {
continue
}
// subscribe to contract notifications
id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
if err != nil {
// if there is some error, undo all subscriptions and return error
for _, id := range notifyIDs {
_ = s.client.Unsubscribe(id)
}
return err
}
// save notification id
notifyIDs = append(notifyIDs, id)
}
for i := range contracts {
s.subscribedEvents[contracts[i]] = true
}
return nil
}
2020-07-24 13:54:03 +00:00
func (s *subscriber) Close() {
s.client.Close()
}
func (s *subscriber) BlockNotifications() error {
s.Lock()
defer s.Unlock()
if s.subscribedToNewBlocks {
return nil
}
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
return fmt.Errorf("could not subscribe for new block events: %w", err)
}
s.subscribedToNewBlocks = true
return nil
}
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
s.Lock()
defer s.Unlock()
if s.subscribedNotaryEvents[mainTXSigner] {
return nil
}
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
return fmt.Errorf("could not subscribe for notary request events: %w", err)
}
s.subscribedNotaryEvents[mainTXSigner] = true
return nil
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) {
switch {
case p == nil:
return nil, errNilParams
case p.Log == nil:
return nil, errNilLogger
case p.Client == nil:
return nil, errNilClient
}
err := awaitHeight(p.Client, p.StartFromBlock)
if err != nil {
return nil, err
}
sub := &subscriber{
log: p.Log,
client: p.Client,
notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent),
current: newSubChannels(),
subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
go sub.routeNotifications(ctx)
return sub, nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {
var (
restoreCh = make(chan bool)
restoreInProgress bool
)
routeloop:
for {
var connLost bool
s.RLock()
curr := s.current
s.RUnlock()
select {
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
if ok {
s.client.Metrics().IncNotificationCount("notify")
s.notifyChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.BlockChan:
if ok {
s.client.Metrics().IncNotificationCount("block")
s.client.Metrics().SetLastBlock(ev.Index)
s.blockChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.NotaryChan:
if ok {
s.client.Metrics().IncNotificationCount("notary")
s.notaryChan <- ev
} else {
connLost = true
}
case ok := <-restoreCh:
restoreInProgress = false
if !ok {
connLost = true
}
}
if connLost {
if !restoreInProgress {
restoreInProgress = s.switchEndpoint(ctx, restoreCh)
if !restoreInProgress {
break routeloop
}
curr.drain()
} else { // Avoid getting additional !ok events.
s.Lock()
s.current.NotifyChan = nil
s.current.BlockChan = nil
s.current.NotaryChan = nil
s.Unlock()
}
}
}
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
}
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) bool {
s.log.Info(logs.RPConnectionLost)
if !s.client.SwitchRPC(ctx) {
s.log.Error(logs.RPCNodeSwitchFailure)
return false
}
s.Lock()
chs := newSubChannels()
go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}()
s.current = chs
s.Unlock()
s.client.Metrics().IncSwitchCount()
return true
}
func newSubChannels() subChannels {
return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent),
BlockChan: make(chan *block.Block),
NotaryChan: make(chan *result.NotaryRequestEvent),
}
}
func (s *subChannels) drain() {
drainloop:
for {
select {
case _, ok := <-s.NotifyChan:
if !ok {
s.NotifyChan = nil
}
case _, ok := <-s.BlockChan:
if !ok {
s.BlockChan = nil
}
case _, ok := <-s.NotaryChan:
if !ok {
s.NotaryChan = nil
}
default:
break drainloop
}
}
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent,
) bool {
var err error
// new block events restoration
if s.subscribedToNewBlocks {
_, err = s.client.ReceiveBlocks(blCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notification events restoration
for contract := range s.subscribedEvents {
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notary notification events restoration
for signer := range s.subscribedNotaryEvents {
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
return true
}
// awaitHeight checks if remote client has least expected block height and
// returns error if it is not reached that height after timeout duration.
// This function is required to avoid connections to unsynced RPC nodes, because
// they can produce events from the past that should not be processed by
// FrostFS nodes.
func awaitHeight(cli *client.Client, startFrom uint32) error {
if startFrom == 0 {
return nil
}
height, err := cli.BlockCount()
if err != nil {
return fmt.Errorf("could not get block height: %w", err)
}
if height < startFrom {
return fmt.Errorf("RPC block counter %d didn't reach expected height %d", height, startFrom)
}
return nil
}