Adopt new neo-go API in morph, drop notaryless events #337

Merged
fyrchik merged 6 commits from fyrchik/frostfs-node:update-deprecated into master 2023-05-12 09:19:39 +00:00
21 changed files with 547 additions and 1479 deletions

View file

@ -195,7 +195,6 @@ const (
EventIgnoreNilNotaryEventHandler = "ignore nil notary event handler" // Warn in ../node/pkg/morph/event/listener.go
EventIgnoreHandlerOfNotaryEventWoParser = "ignore handler of notary event w/o parser" // Warn in ../node/pkg/morph/event/listener.go
EventIgnoreNilBlockHandler = "ignore nil block handler" // Warn in ../node/pkg/morph/event/listener.go
SubscriberUnsubscribeForNotification = "unsubscribe for notification" // Error in ../node/pkg/morph/subscriber/subscriber.go
SubscriberRemoteNotificationChannelHasBeenClosed = "remote notification channel has been closed" // Warn in ../node/pkg/morph/subscriber/subscriber.go
SubscriberCantCastNotifyEventValueToTheNotifyStruct = "can't cast notify event value to the notify struct" // Error in ../node/pkg/morph/subscriber/subscriber.go
SubscriberNewNotificationEventFromSidechain = "new notification event from sidechain" // Debug in ../node/pkg/morph/subscriber/subscriber.go

View file

@ -79,13 +79,6 @@ type NetworkState interface {
HomomorphicHashDisabled() (bool, error)
}
const (
putNotification = "containerPut"
deleteNotification = "containerDelete"
setEACLNotification = "setEACL"
)
// New creates a container contract processor instance.
func New(p *Params) (*Processor, error) {
switch {
@ -121,66 +114,12 @@ func New(p *Params) (*Processor, error) {
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (cp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
if !cp.notaryDisabled {
return nil
}
var (
parsers = make([]event.NotificationParserInfo, 0, 3)
p event.NotificationParserInfo
)
p.SetScriptHash(cp.cnrClient.ContractAddress())
// container put
p.SetType(event.TypeFromString(putNotification))
p.SetParser(containerEvent.ParsePut)
parsers = append(parsers, p)
// container delete
p.SetType(event.TypeFromString(deleteNotification))
p.SetParser(containerEvent.ParseDelete)
parsers = append(parsers, p)
// set eACL
p.SetType(event.TypeFromString(setEACLNotification))
p.SetParser(containerEvent.ParseSetEACL)
parsers = append(parsers, p)
return parsers
return nil
}
// ListenerNotificationHandlers for the 'event.Listener' event producer.
func (cp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
if !cp.notaryDisabled {
return nil
}
var (
handlers = make([]event.NotificationHandlerInfo, 0, 3)
h event.NotificationHandlerInfo
)
h.SetScriptHash(cp.cnrClient.ContractAddress())
// container put
h.SetType(event.TypeFromString(putNotification))
h.SetHandler(cp.handlePut)
handlers = append(handlers, h)
// container delete
h.SetType(event.TypeFromString(deleteNotification))
h.SetHandler(cp.handleDelete)
handlers = append(handlers, h)
// set eACL
h.SetType(event.TypeFromString(setEACLNotification))
h.SetHandler(cp.handleSetEACL)
handlers = append(handlers, h)
return handlers
return nil
}
// ListenerNotaryParsers for the 'event.Listener' notary event producer.

View file

@ -119,9 +119,7 @@ type (
)
const (
newEpochNotification = "NewEpoch"
addPeerNotification = "AddPeer"
updatePeerStateNotification = "UpdateState"
newEpochNotification = "NewEpoch"
)
// New creates network map contract processor instance.
@ -189,20 +187,6 @@ func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInf
p.SetParser(netmapEvent.ParseNewEpoch)
parsers = append(parsers, p)
if !np.notaryDisabled {
return parsers
}
// new peer event
p.SetType(addPeerNotification)
p.SetParser(netmapEvent.ParseAddPeer)
parsers = append(parsers, p)
// update peer event
p.SetType(updatePeerStateNotification)
p.SetParser(netmapEvent.ParseUpdatePeer)
parsers = append(parsers, p)
return parsers
}
@ -219,20 +203,6 @@ func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI
i.SetHandler(np.handleNewEpoch)
handlers = append(handlers, i)
if !np.notaryDisabled {
return handlers
}
// new peer handler
i.SetType(addPeerNotification)
i.SetHandler(np.handleAddPeer)
handlers = append(handlers, i)
// update peer handler
i.SetType(updatePeerStateNotification)
i.SetHandler(np.handleUpdateState)
handlers = append(handlers, i)
return handlers
}

View file

@ -69,9 +69,6 @@ type Client struct {
// on every normal call.
switchLock *sync.RWMutex
notifications chan rpcclient.Notification
subsInfo // protected with switchLock
// channel for internal stop
closeChan chan struct{}
@ -156,8 +153,6 @@ func (e *notHaltStateError) Error() string {
)
}
var errEmptyInvocationScript = errors.New("got empty invocation script from neo node")
// implementation of error interface for FrostFS-specific errors.
type frostfsError struct {
err error
@ -566,26 +561,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val
// NotificationChannel returns channel than receives subscribed
// notification from the connected RPC node.
// Channel is closed when connection to the RPC node has been
// lost without the possibility of recovery.
// Channel is closed when connection to the RPC node is lost.
func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
return c.notifications
}
// inactiveMode switches Client to an inactive mode:
// - notification channel is closed;
// - all the new RPC request would return ErrConnectionLost;
// - inactiveModeCb is called if not nil.
func (c *Client) inactiveMode() {
c.switchLock.Lock()
defer c.switchLock.Unlock()
close(c.notifications)
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
c.switchLock.RLock()
defer c.switchLock.RUnlock()
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
}
func (c *Client) setActor(act *actor.Actor) {

View file

@ -10,11 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -108,21 +105,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
}
cli := &Client{
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification),
subsInfo: subsInfo{
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
},
closeChan: make(chan struct{}),
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
switchLock: &sync.RWMutex{},
closeChan: make(chan struct{}),
}
cli.endpoints.init(cfg.endpoints)
@ -162,7 +151,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
}
cli.setActor(act)
go cli.notificationLoop(ctx)
go cli.closeWaiter(ctx)
return cli, nil
}

View file

@ -6,11 +6,6 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap"
)
@ -34,7 +29,8 @@ func (e *endpoints) init(ee []Endpoint) {
e.list = ee
}
func (c *Client) switchRPC(ctx context.Context) bool {
// SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC(ctx context.Context) bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()
@ -58,20 +54,8 @@ func (c *Client) switchRPC(ctx context.Context) bool {
c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished,
zap.String("endpoint", newEndpoint))
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
c.client = cli
c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -82,97 +66,21 @@ func (c *Client) switchRPC(ctx context.Context) bool {
return true
}
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return false
}
func (c *Client) notificationLoop(ctx context.Context) {
var e any
var ok bool
for {
c.switchLock.RLock()
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(ctx, e)
continue
}
if !c.reconnect(ctx) {
return
}
}
}
func (c *Client) routeEvent(ctx context.Context, e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
func (c *Client) closeWaiter(ctx context.Context) {
select {
case c.notifications <- typedNotification:
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect(ctx context.Context) bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn(logs.ClientSwitchingToTheNextRPCNode,
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC(ctx) {
c.logger.Error(logs.ClientCouldNotEstablishConnectionToAnyRPCNode)
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
_ = c.UnsubscribeAll()
c.close()
}
func (c *Client) switchToMostPrioritized(ctx context.Context) {
@ -218,36 +126,28 @@ mainLoop:
continue
}
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok {
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
zap.String("endpoint", tryE))
return
}
c.logger.Warn(logs.ClientCouldNotRestoreSideChainSubscriptionsUsingNode,
zap.String("endpoint", tryE),
zap.Error(err),
)
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.endpoints.curr = i
c.switchLock.Unlock()
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
zap.String("endpoint", tryE))
return
}
}
}
@ -255,6 +155,7 @@ mainLoop:
// close closes notification channel and wrapped WS client.
func (c *Client) close() {
Review

Why not locked with switchLock?

Why not locked with switchLock?
Review

Fixed

Fixed
close(c.notifications)
c.switchLock.RLock()
defer c.switchLock.RUnlock()
c.client.Close()
}

View file

@ -1,6 +1,7 @@
package client
import (
"crypto/elliptic"
"encoding/binary"
"errors"
"fmt"
@ -18,19 +19,20 @@ import (
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/notary"
sc "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
type (
notaryInfo struct {
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
fallbackTime uint32 // mainTx's ValidUntilBlock - fallbackTime + 1 is when fallbackTx is sent
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
alphabetSource AlphabetKeys // source of alphabet node keys to prepare witness
@ -41,7 +43,7 @@ type (
notaryCfg struct {
proxy util.Uint160
txValidTime, roundTime, fallbackTime uint32
txValidTime, roundTime uint32
alphabetSource AlphabetKeys
}
@ -51,9 +53,8 @@ type (
)
const (
defaultNotaryValidTime = 50
defaultNotaryRoundTime = 100
defaultNotaryFallbackTime = 40
defaultNotaryValidTime = 50
defaultNotaryRoundTime = 100
notaryBalanceOfMethod = "balanceOf"
notaryExpirationOfMethod = "expirationOf"
@ -69,7 +70,6 @@ func defaultNotaryConfig(c *Client) *notaryCfg {
return &notaryCfg{
txValidTime: defaultNotaryValidTime,
roundTime: defaultNotaryRoundTime,
fallbackTime: defaultNotaryFallbackTime,
alphabetSource: c.Committee,
}
}
@ -104,7 +104,6 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error {
proxy: cfg.proxy,
txValidTime: cfg.txValidTime,
roundTime: cfg.roundTime,
fallbackTime: cfg.fallbackTime,
alphabetSource: cfg.alphabetSource,
notary: notary.Hash,
}
@ -408,32 +407,32 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error {
return fmt.Errorf("could not fetch current alphabet keys: %w", err)
}
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
if err != nil {
return err
}
// mainTX is expected to be pre-validated: second witness must exist and be empty
mainTx.Scripts[1].VerificationScript = multiaddrAccount.GetVerificationScript()
mainTx.Scripts[1].InvocationScript = append(
[]byte{byte(opcode.PUSHDATA1), 64},
multiaddrAccount.SignHashable(c.rpcActor.GetNetwork(), mainTx)...,
)
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
if err != nil {
return err
}
// Sign exactly the same transaction we've got from the received Notary request.
err = nAct.Sign(mainTx)
if err != nil {
return fmt.Errorf("faield to sign notary request: %w", err)
}
mainH, fbH, untilActual, err := nAct.Notarize(mainTx, nil)
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
[]byte{byte(opcode.RET)},
-1,
0,
c.notary.fallbackTime,
c.acc)
if err != nil && !alreadyOnChainError(err) {
return err
}
c.logger.Debug(logs.ClientNotaryRequestWithPreparedMainTXInvoked,
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
zap.Stringer("tx_hash", resp.Hash().Reverse()))
zap.String("tx_hash", mainH.StringLE()),
zap.Uint32("valid_until_block", untilActual),
zap.String("fallback_hash", fbH.StringLE()))
return nil
}
@ -449,70 +448,147 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint
return err
}
cosigners, err := c.notaryCosigners(invokedByAlpha, alphabetList, committee)
if err != nil {
return err
}
params, err := invocationParams(args...)
if err != nil {
return err
}
test, err := c.makeTestInvocation(contract, method, params, cosigners)
if err != nil {
return err
}
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, committee, invokedByAlpha)
if err != nil {
return err
}
until, err := c.getUntilValue(vub)
if err != nil {
return err
}
mainTx, err := c.buildMainTx(invokedByAlpha, nonce, alphabetList, test, cosigners, multiaddrAccount, until)
cosigners, err := c.notaryCosigners(invokedByAlpha, alphabetList, committee)
if err != nil {
return err
}
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
[]byte{byte(opcode.RET)},
-1,
0,
c.notary.fallbackTime,
c.acc)
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
if err != nil {
return err
}
mainH, fbH, untilActual, err := nAct.Notarize(nAct.MakeTunedCall(contract, method, nil, func(r *result.Invoke, t *transaction.Transaction) error {
if r.State != vmstate.Halt.String() {
return wrapFrostFSError(&notHaltStateError{state: r.State, exception: r.FaultException})
}
t.ValidUntilBlock = until
t.Nonce = nonce
return nil
}, args...))
if err != nil && !alreadyOnChainError(err) {
return err
}
c.logger.Debug(logs.ClientNotaryRequestInvoked,
zap.String("method", method),
zap.Uint32("valid_until_block", until),
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
zap.Stringer("tx_hash", resp.Hash().Reverse()))
zap.Uint32("valid_until_block", untilActual),
zap.String("tx_hash", mainH.StringLE()),
zap.String("fallback_hash", fbH.StringLE()))
return nil
}
func (c *Client) makeTestInvocation(contract util.Uint160, method string, params []sc.Parameter, cosigners []transaction.Signer) (*result.Invoke, error) {
test, err := c.client.InvokeFunction(contract, method, params, cosigners)
func (c *Client) notaryCosignersFromTx(mainTx *transaction.Transaction, alphabetList keys.PublicKeys) ([]actor.SignerAccount, error) {
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
if err != nil {
return nil, err
}
if test.State != HaltState {
return nil, wrapFrostFSError(&notHaltStateError{state: test.State, exception: test.FaultException})
// Here we need to add a committee signature (second witness) to the pre-validated
// main transaction without creating a new one. However, Notary actor demands the
// proper set of signers for constructor, thus, fill it from the main transaction's signers list.
s := make([]actor.SignerAccount, 2, 3)
s[0] = actor.SignerAccount{
// Proxy contract that will pay for the execution.
Signer: mainTx.Signers[0],
Account: notary.FakeContractAccount(mainTx.Signers[0].Account),
}
s[1] = actor.SignerAccount{
// Inner ring multisignature.
Signer: mainTx.Signers[1],
Account: multiaddrAccount,
}
if len(mainTx.Signers) > 3 {
// Invoker signature (simple signature account of storage node is expected).
var acc *wallet.Account
script := mainTx.Scripts[2].VerificationScript
if len(script) == 0 {
acc = notary.FakeContractAccount(mainTx.Signers[2].Account)
} else {
pubBytes, ok := vm.ParseSignatureContract(script)
if ok {
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
if err != nil {
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key: %w", err)
}
acc = notary.FakeSimpleAccount(pub)
} else {
m, pubsBytes, ok := vm.ParseMultiSigContract(script)
if !ok {
return nil, errors.New("failed to parse verification script of signer #2: unknown witness type")
}
pubs := make(keys.PublicKeys, len(pubsBytes))
for i := range pubs {
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
if err != nil {
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key #%d: %w", i, err)
}
}
acc, err = notary.FakeMultisigAccount(m, pubs)
if err != nil {
return nil, fmt.Errorf("failed to create fake account for signer #2: %w", err)
}
}
}
s = append(s, actor.SignerAccount{
Signer: mainTx.Signers[2],
Account: acc,
})
}
if len(test.Script) == 0 {
return nil, wrapFrostFSError(errEmptyInvocationScript)
return s, nil
}
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]actor.SignerAccount, error) {
multiaddrAccount, err := c.notaryMultisigAccount(ir, committee, invokedByAlpha)
if err != nil {
return nil, err
}
return test, nil
s := make([]actor.SignerAccount, 2, 3)
// Proxy contract that will pay for the execution.
s[0] = actor.SignerAccount{
Signer: transaction.Signer{
Account: c.notary.proxy,
Scopes: transaction.None,
},
Account: notary.FakeContractAccount(c.notary.proxy),
}
// Inner ring multisignature.
s[1] = actor.SignerAccount{
Signer: transaction.Signer{
Account: multiaddrAccount.ScriptHash(),
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
},
Account: multiaddrAccount,
}
if !invokedByAlpha {
// Invoker signature.
s = append(s, actor.SignerAccount{
Signer: transaction.Signer{
Account: hash.Hash160(c.acc.GetVerificationScript()),
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
},
Account: c.acc,
})
}
// The last one is Notary contract that will be added to the signers list
// by Notary actor automatically.
return s, nil
}
func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
@ -522,195 +598,6 @@ func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
return c.notaryTxValidationLimit()
}
func (c *Client) buildMainTx(invokedByAlpha bool, nonce uint32, alphabetList keys.PublicKeys, test *result.Invoke,
cosigners []transaction.Signer, multiaddrAccount *wallet.Account, until uint32) (*transaction.Transaction, error) {
// after test invocation we build main multisig transaction
u8n := uint8(len(alphabetList))
if !invokedByAlpha {
u8n++
}
// prepare main tx
mainTx := &transaction.Transaction{
Nonce: nonce,
SystemFee: test.GasConsumed,
ValidUntilBlock: until,
Script: test.Script,
Attributes: []transaction.Attribute{
{
Type: transaction.NotaryAssistedT,
Value: &transaction.NotaryAssisted{NKeys: u8n},
},
},
Signers: cosigners,
}
// calculate notary fee
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
notaryFee, err := c.client.CalculateNotaryFee(u8n)
if err != nil {
return nil, err
}
// add network fee for cosigners
//nolint:staticcheck // waits for neo-go v0.99.3 with notary actors
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
err = c.client.AddNetworkFee(
mainTx,
notaryFee,
c.notaryAccounts(invokedByAlpha, multiaddrAccount)...,
)
if err != nil {
return nil, err
}
// define witnesses
mainTx.Scripts = c.notaryWitnesses(invokedByAlpha, multiaddrAccount, mainTx)
return mainTx, nil
}
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]transaction.Signer, error) {
s := make([]transaction.Signer, 0, 4)
// first we have proxy contract signature, as it will pay for the execution
s = append(s, transaction.Signer{
Account: c.notary.proxy,
Scopes: transaction.None,
})
// then we have inner ring multiaddress signature
m := sigCount(ir, committee)
multisigScript, err := sc.CreateMultiSigRedeemScript(m, ir)
if err != nil {
// wrap error as FrostFS-specific since the call is not related to any client
return nil, wrapFrostFSError(fmt.Errorf("can't create ir multisig redeem script: %w", err))
}
s = append(s, transaction.Signer{
Account: hash.Hash160(multisigScript),
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
})
if !invokedByAlpha {
// then we have invoker signature
s = append(s, transaction.Signer{
Account: hash.Hash160(c.acc.GetVerificationScript()),
Scopes: c.cfg.signer.Scopes,
AllowedContracts: c.cfg.signer.AllowedContracts,
AllowedGroups: c.cfg.signer.AllowedGroups,
})
}
// last one is a placeholder for notary contract signature
s = append(s, transaction.Signer{
Account: c.notary.notary,
Scopes: transaction.None,
})
return s, nil
}
func (c *Client) notaryAccounts(invokedByAlpha bool, multiaddr *wallet.Account) []*wallet.Account {
if multiaddr == nil {
return nil
}
a := make([]*wallet.Account, 0, 4)
// first we have proxy account, as it will pay for the execution
a = append(a, notary.FakeContractAccount(c.notary.proxy))
// then we have inner ring multiaddress account
a = append(a, multiaddr)
if !invokedByAlpha {
// then we have invoker account
a = append(a, c.acc)
}
// last one is a placeholder for notary contract account
a = append(a, &wallet.Account{
Contract: &wallet.Contract{},
})
return a
}
func (c *Client) notaryWitnesses(invokedByAlpha bool, multiaddr *wallet.Account, tx *transaction.Transaction) []transaction.Witness {
if multiaddr == nil || tx == nil {
return nil
}
w := make([]transaction.Witness, 0, 4)
// first we have empty proxy witness, because notary will execute `Verify`
// method on the proxy contract to check witness
w = append(w, transaction.Witness{
InvocationScript: []byte{},
VerificationScript: []byte{},
})
// then we have inner ring multiaddress witness
// invocation script should be of the form:
// { PUSHDATA1, 64, signatureBytes... }
// to pass Notary module verification
var invokeScript []byte
magicNumber := c.rpcActor.GetNetwork()
if invokedByAlpha {
invokeScript = append(
[]byte{byte(opcode.PUSHDATA1), 64},
multiaddr.SignHashable(magicNumber, tx)...,
)
} else {
// we can't provide alphabet node signature
// because Storage Node doesn't own alphabet's
// private key. Thus, add dummy witness with
// empty bytes instead of signature
invokeScript = append(
[]byte{byte(opcode.PUSHDATA1), 64},
make([]byte, 64)...,
)
}
w = append(w, transaction.Witness{
InvocationScript: invokeScript,
VerificationScript: multiaddr.GetVerificationScript(),
})
if !invokedByAlpha {
// then we have invoker witness
invokeScript = append(
[]byte{byte(opcode.PUSHDATA1), 64},
c.acc.SignHashable(magicNumber, tx)...,
)
w = append(w, transaction.Witness{
InvocationScript: invokeScript,
VerificationScript: c.acc.GetVerificationScript(),
})
}
// last one is a placeholder for notary contract witness
w = append(w, transaction.Witness{
InvocationScript: append(
[]byte{byte(opcode.PUSHDATA1), 64},
make([]byte, 64)...,
),
VerificationScript: []byte{},
})
return w
}
func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedByAlpha bool) (*wallet.Account, error) {
m := sigCount(ir, committee)
@ -767,21 +654,6 @@ func (c *Client) depositExpirationOf() (int64, error) {
return currentTillBig.Int64(), nil
}
func invocationParams(args ...any) ([]sc.Parameter, error) {
params := make([]sc.Parameter, 0, len(args))
for i := range args {
param, err := toStackParameter(args[i])
if err != nil {
return nil, err
}
params = append(params, param)
}
return params, nil
}
// sigCount returns the number of required signature.
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
// If committee is true, returns M as N/2+1.
@ -809,15 +681,6 @@ func WithRoundTime(t uint32) NotaryOption {
}
}
// WithFallbackTime returns a notary support option for client
// that specifies amount of blocks before fallbackTx will be sent.
// Should be less than TxValidTime.
func WithFallbackTime(t uint32) NotaryOption {
return func(c *notaryCfg) {
c.fallbackTime = t
}
}
// WithAlphabetSource returns a notary support option for client
// that specifies function to return list of alphabet node keys.
// By default notary subsystem uses committee as a source. This is

View file

@ -1,16 +1,11 @@
package client
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
// Close closes connection to the remote side making
@ -24,71 +19,46 @@ func (c *Client) Close() {
close(c.closeChan)
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during contract transaction execution to this instance of client.
// ReceiveExecutionNotifications performs subscription for notifications
// generated during contract execution. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error {
func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
c.subscribedEvents[contract] = id
return nil
return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
}
// SubscribeForNewBlocks adds subscription for new block events to this
// instance of client.
// ReceiveBlocks performs subscription for new block events. Events are sent
// to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNewBlocks() error {
func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
if c.subscribedToBlocks {
// no need to subscribe one more time
return nil
}
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
c.subscribedToBlocks = true
return nil
return c.client.ReceiveBlocks(nil, ch)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
// ReceiveNotaryRequests performsn subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be
// signed by txSigner.
// signed by txSigner. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
@ -97,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
return "", ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[txSigner]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
c.subscribedNotaryEvents[txSigner] = id
return nil
return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
}
// UnsubscribeContract removes subscription for given contract event stream.
// Unsubscribe performs unsubscription for the given subscription ID.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeContract(contract util.Uint160) error {
func (c *Client) Unsubscribe(subID string) error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
@ -128,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error {
return ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if !subscribed {
// no need to unsubscribe contract
// without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedEvents[contract])
if err != nil {
return err
}
delete(c.subscribedEvents, contract)
return nil
}
// UnsubscribeNotaryRequest removes subscription for given notary requests
// signer.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[signer]
if !subscribed {
// no need to unsubscribe signer's
// requests without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
if err != nil {
return err
}
delete(c.subscribedNotaryEvents, signer)
return nil
return c.client.Unsubscribe(subID)
}
// UnsubscribeAll removes all active subscriptions of current client.
@ -191,163 +100,10 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost
}
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToBlocks {
return nil
}
err := c.client.UnsubscribeAll()
if err != nil {
return err
}
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToBlocks = false
return nil
}
// subsInfo includes channels for ws notifications;
// cached subscription information.
type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
)
stopCh := make(chan struct{})
defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background)
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration
if si.subscribedToBlocks {
_, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
}
// notification events restoration
for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedNotaryEvents[signer] = id
}
}
return si, true
}
func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block,
notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) {
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
var e any
var ok bool
for {
select {
case <-stopCh:
return
case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok {
return
}
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(ctx, e)
}
}()
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
}

View file

@ -44,45 +44,6 @@ func (d Delete) NotaryRequest() *payload.P2PNotaryRequest {
const expectedItemNumDelete = 3
// ParseDelete from notification into container event structure.
//
// Expects 3 stack items.
func ParseDelete(e *state.ContainedNotificationEvent) (event.Event, error) {
var (
ev Delete
err error
)
params, err := event.ParseStackArray(e)
if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
}
if ln := len(params); ln != expectedItemNumDelete {
return nil, event.WrongNumberOfParameters(expectedItemNumDelete, ln)
}
// parse container
ev.ContainerIDValue, err = client.BytesFromStackItem(params[0])
if err != nil {
return nil, fmt.Errorf("could not get container: %w", err)
}
// parse signature
ev.SignatureValue, err = client.BytesFromStackItem(params[1])
if err != nil {
return nil, fmt.Errorf("could not get signature: %w", err)
}
// parse session token
ev.TokenValue, err = client.BytesFromStackItem(params[2])
if err != nil {
return nil, fmt.Errorf("could not get session token: %w", err)
}
return ev, nil
}
// DeleteSuccess structures notification event of successful container removal
// thrown by Container contract.
type DeleteSuccess struct {

View file

@ -10,66 +10,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestParseDelete(t *testing.T) {
var (
containerID = []byte("containreID")
signature = []byte("signature")
token = []byte("token")
)
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{
stackitem.NewMap(),
}
_, err := ParseDelete(createNotifyEventFromItems(prms))
require.EqualError(t, err, event.WrongNumberOfParameters(3, len(prms)).Error())
})
t.Run("wrong container parameter", func(t *testing.T) {
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong signature parameter", func(t *testing.T) {
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerID),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong session token parameter", func(t *testing.T) {
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerID),
stackitem.NewByteArray(signature),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("correct behavior", func(t *testing.T) {
ev, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerID),
stackitem.NewByteArray(signature),
stackitem.NewByteArray(token),
}))
require.NoError(t, err)
require.Equal(t, Delete{
ContainerIDValue: containerID,
SignatureValue: signature,
TokenValue: token,
}, ev)
})
}
func TestParseDeleteSuccess(t *testing.T) {
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{

View file

@ -1,11 +1,6 @@
package container
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
)
@ -54,48 +49,3 @@ func (x SetEACL) NotaryRequest() *payload.P2PNotaryRequest {
}
const expectedItemNumEACL = 4
// ParseSetEACL parses SetEACL notification event from list of stack items.
//
// Expects 4 stack items.
func ParseSetEACL(e *state.ContainedNotificationEvent) (event.Event, error) {
var (
ev SetEACL
err error
)
params, err := event.ParseStackArray(e)
if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
}
if ln := len(params); ln != expectedItemNumEACL {
return nil, event.WrongNumberOfParameters(expectedItemNumEACL, ln)
}
// parse table
ev.TableValue, err = client.BytesFromStackItem(params[0])
if err != nil {
return nil, fmt.Errorf("could not parse binary table: %w", err)
}
// parse signature
ev.SignatureValue, err = client.BytesFromStackItem(params[1])
if err != nil {
return nil, fmt.Errorf("could not parse table signature: %w", err)
}
// parse public key
ev.PublicKeyValue, err = client.BytesFromStackItem(params[2])
if err != nil {
return nil, fmt.Errorf("could not parse binary public key: %w", err)
}
// parse session token
ev.TokenValue, err = client.BytesFromStackItem(params[3])
if err != nil {
return nil, fmt.Errorf("could not get session token: %w", err)
}
return ev, nil
}

View file

@ -1,90 +1,10 @@
package container
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/stretchr/testify/require"
)
func TestParseEACL(t *testing.T) {
var (
binaryTable = []byte("table")
signature = []byte("signature")
publicKey = []byte("pubkey")
token = []byte("token")
)
t.Run("wrong number of parameters", func(t *testing.T) {
items := []stackitem.Item{
stackitem.NewMap(),
stackitem.NewMap(),
}
_, err := ParseSetEACL(createNotifyEventFromItems(items))
require.EqualError(t, err, event.WrongNumberOfParameters(4, len(items)).Error())
})
t.Run("wrong container parameter", func(t *testing.T) {
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewMap(),
stackitem.NewMap(),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong signature parameter", func(t *testing.T) {
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(binaryTable),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong key parameter", func(t *testing.T) {
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(binaryTable),
stackitem.NewByteArray(signature),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong session token parameter", func(t *testing.T) {
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(binaryTable),
stackitem.NewByteArray(signature),
stackitem.NewByteArray(publicKey),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("correct behavior", func(t *testing.T) {
ev, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(binaryTable),
stackitem.NewByteArray(signature),
stackitem.NewByteArray(publicKey),
stackitem.NewByteArray(token),
}))
require.NoError(t, err)
e := ev.(SetEACL)
require.Equal(t, binaryTable, e.Table())
require.Equal(t, signature, e.Signature())
require.Equal(t, publicKey, e.PublicKey())
require.Equal(t, token, e.SessionToken())
})
}
func createNotifyEventFromItems(items []stackitem.Item) *state.ContainedNotificationEvent {
return &state.ContainedNotificationEvent{
NotificationEvent: state.NotificationEvent{

View file

@ -65,49 +65,6 @@ func (x PutNamed) Zone() string {
return x.zone
}
// ParsePut from notification into container event structure.
func ParsePut(e *state.ContainedNotificationEvent) (event.Event, error) {
var (
ev Put
err error
)
params, err := event.ParseStackArray(e)
if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
}
if ln := len(params); ln != expectedItemNumPut {
return nil, event.WrongNumberOfParameters(expectedItemNumPut, ln)
}
// parse container
ev.rawContainer, err = client.BytesFromStackItem(params[0])
if err != nil {
return nil, fmt.Errorf("could not get container: %w", err)
}
// parse signature
ev.signature, err = client.BytesFromStackItem(params[1])
if err != nil {
return nil, fmt.Errorf("could not get signature: %w", err)
}
// parse public key
ev.publicKey, err = client.BytesFromStackItem(params[2])
if err != nil {
return nil, fmt.Errorf("could not get public key: %w", err)
}
// parse session token
ev.token, err = client.BytesFromStackItem(params[3])
if err != nil {
return nil, fmt.Errorf("could not get sesison token: %w", err)
}
return ev, nil
}
// PutSuccess structures notification event of successful container creation
// thrown by Container contract.
type PutSuccess struct {

View file

@ -10,80 +10,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestParsePut(t *testing.T) {
var (
containerData = []byte("containerData")
signature = []byte("signature")
publicKey = []byte("pubkey")
token = []byte("token")
)
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{
stackitem.NewMap(),
stackitem.NewMap(),
}
_, err := ParsePut(createNotifyEventFromItems(prms))
require.EqualError(t, err, event.WrongNumberOfParameters(expectedItemNumPut, len(prms)).Error())
})
t.Run("wrong container parameter", func(t *testing.T) {
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong signature parameter", func(t *testing.T) {
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerData),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong key parameter", func(t *testing.T) {
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerData),
stackitem.NewByteArray(signature),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong session token parameter", func(t *testing.T) {
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerData),
stackitem.NewByteArray(signature),
stackitem.NewByteArray(publicKey),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("correct behavior", func(t *testing.T) {
ev, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(containerData),
stackitem.NewByteArray(signature),
stackitem.NewByteArray(publicKey),
stackitem.NewByteArray(token),
}))
require.NoError(t, err)
require.Equal(t, Put{
rawContainer: containerData,
signature: signature,
publicKey: publicKey,
token: token,
}, ev)
})
}
func TestParsePutSuccess(t *testing.T) {
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{

View file

@ -1,11 +1,6 @@
package netmap
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
)
@ -31,26 +26,3 @@ func (s AddPeer) NotaryRequest() *payload.P2PNotaryRequest {
}
const expectedItemNumAddPeer = 1
func ParseAddPeer(e *state.ContainedNotificationEvent) (event.Event, error) {
var (
ev AddPeer
err error
)
params, err := event.ParseStackArray(e)
if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
}
if ln := len(params); ln != expectedItemNumAddPeer {
return nil, event.WrongNumberOfParameters(expectedItemNumAddPeer, ln)
}
ev.NodeBytes, err = client.BytesFromStackItem(params[0])
if err != nil {
return nil, fmt.Errorf("could not get raw nodeinfo: %w", err)
}
return ev, nil
}

View file

@ -1,47 +1,10 @@
package netmap
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/stretchr/testify/require"
)
func TestParseAddPeer(t *testing.T) {
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{
stackitem.NewMap(),
stackitem.NewMap(),
}
_, err := ParseAddPeer(createNotifyEventFromItems(prms))
require.EqualError(t, err, event.WrongNumberOfParameters(1, len(prms)).Error())
})
t.Run("wrong first parameter type", func(t *testing.T) {
_, err := ParseAddPeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("correct behavior", func(t *testing.T) {
info := []byte{1, 2, 3}
ev, err := ParseAddPeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(info),
}))
require.NoError(t, err)
require.Equal(t, AddPeer{
NodeBytes: info,
}, ev)
})
}
func createNotifyEventFromItems(items []stackitem.Item) *state.ContainedNotificationEvent {
return &state.ContainedNotificationEvent{
NotificationEvent: state.NotificationEvent{

View file

@ -1,13 +1,9 @@
package netmap
import (
"crypto/elliptic"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-contract/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
)
@ -60,43 +56,3 @@ func (s *UpdatePeer) decodeState(state int64) error {
}
const expectedItemNumUpdatePeer = 2
func ParseUpdatePeer(e *state.ContainedNotificationEvent) (event.Event, error) {
var (
ev UpdatePeer
err error
)
params, err := event.ParseStackArray(e)
if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
}
if ln := len(params); ln != expectedItemNumUpdatePeer {
return nil, event.WrongNumberOfParameters(expectedItemNumUpdatePeer, ln)
}
// parse public key
key, err := client.BytesFromStackItem(params[1])
if err != nil {
return nil, fmt.Errorf("could not get public key: %w", err)
}
ev.PubKey, err = keys.NewPublicKeyFromBytes(key, elliptic.P256())
if err != nil {
return nil, fmt.Errorf("could not parse public key: %w", err)
}
// parse node status
st, err := client.IntFromStackItem(params[0])
if err != nil {
return nil, fmt.Errorf("could not get node status: %w", err)
}
err = ev.decodeState(st)
if err != nil {
return nil, err
}
return ev, nil
}

View file

@ -1,59 +0,0 @@
package netmap
import (
"math/big"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-contract/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/stretchr/testify/require"
)
func TestParseUpdatePeer(t *testing.T) {
priv, err := keys.NewPrivateKey()
require.NoError(t, err)
publicKey := priv.PublicKey()
t.Run("wrong number of parameters", func(t *testing.T) {
prms := []stackitem.Item{
stackitem.NewMap(),
}
_, err := ParseUpdatePeer(createNotifyEventFromItems(prms))
require.EqualError(t, err, event.WrongNumberOfParameters(2, len(prms)).Error())
})
t.Run("wrong first parameter type", func(t *testing.T) {
_, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("wrong second parameter type", func(t *testing.T) {
_, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewByteArray(publicKey.Bytes()),
stackitem.NewMap(),
}))
require.Error(t, err)
})
t.Run("correct behavior", func(t *testing.T) {
const state = netmap.NodeStateMaintenance
ev, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
stackitem.NewBigInteger(big.NewInt(int64(state))),
stackitem.NewByteArray(publicKey.Bytes()),
}))
require.NoError(t, err)
require.Equal(t, UpdatePeer{
PubKey: publicKey,
State: state,
}, ev)
})
}

View file

@ -185,15 +185,15 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
}
invokerWitness := ln == 4
multiInvScript := nr.MainTransaction.Scripts[1].InvocationScript
// alphabet node should handle only notary requests
// that have been sent unsigned (by storage nodes) =>
// such main TXs should have either a dummy or an
// empty script as an invocation script
// alphabet node should handle only notary requests that do not yet have inner
// ring multisignature filled => such main TXs either have empty invocation script
// of the inner ring witness (in case if Notary Actor is used to create request)
// or have it filled with dummy bytes (if request was created manually with the old
// neo-go API)
//
// this check prevents notary flow recursion
if len(multiInvScript) > 0 && !bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript) {
if !(len(nr.MainTransaction.Scripts[1].InvocationScript) == 0 ||
bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript)) { // compatibility with old version
return ErrTXAlreadyHandled
}
@ -220,12 +220,7 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
}
// validate main TX expiration
err = p.validateExpiration(nr.FallbackTransaction)
if err != nil {
return err
}
return nil
return p.validateExpiration(nr.FallbackTransaction)
}
func (p Preparator) validateParameterOpcodes(ops []Op) error {
@ -363,7 +358,9 @@ func (p Preparator) validateWitnesses(w []transaction.Witness, alphaKeys keys.Pu
// the last one must be a placeholder for notary contract witness
last := len(w) - 1
if !bytes.Equal(w[last].InvocationScript, p.dummyInvocationScript) || len(w[last].VerificationScript) != 0 {
if !(len(w[last].InvocationScript) == 0 || // https://github.com/nspcc-dev/neo-go/pull/2981
bytes.Equal(w[last].InvocationScript, p.dummyInvocationScript)) || // compatibility with old version
len(w[last].VerificationScript) != 0 {
return errIncorrectNotaryPlaceholder
}

View file

@ -1,6 +1,7 @@
package event
import (
"fmt"
"testing"
"github.com/nspcc-dev/neo-go/pkg/vm"
@ -24,8 +25,9 @@ var (
alphaKeys keys.PublicKeys
wrongAlphaKeys keys.PublicKeys
dummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...)
wrongDummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64, 1}, make([]byte, 63)...)
dummyAlphabetInvocationScript = []byte{} // expected to be empty if generated by Notary Actor, as requester can't fill it in
dummyAlphabetInvocationScriptOld = append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...) // expected to be dummy if generated manually
wrongDummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64, 1}, make([]byte, 63)...)
scriptHash util.Uint160
)
@ -61,35 +63,37 @@ func TestPrepare_IncorrectScript(t *testing.T) {
},
)
t.Run("not contract call", func(t *testing.T) {
bw := io.NewBufBinWriter()
for _, dummyMultisig := range []bool{true, false} { // try both empty and dummy multisig/Notary invocation witness script
t.Run(fmt.Sprintf("not contract call, compat: %t", dummyMultisig), func(t *testing.T) {
bw := io.NewBufBinWriter()
emit.Int(bw.BinWriter, 4)
emit.String(bw.BinWriter, "test")
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
emit.Syscall(bw.BinWriter, interopnames.SystemContractCallNative) // any != interopnames.SystemContractCall
emit.Int(bw.BinWriter, 4)
emit.String(bw.BinWriter, "test")
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
emit.Syscall(bw.BinWriter, interopnames.SystemContractCallNative) // any != interopnames.SystemContractCall
nr := correctNR(bw.Bytes(), false)
nr := correctNR(bw.Bytes(), dummyMultisig, false)
_, err := preparator.Prepare(nr)
_, err := preparator.Prepare(nr)
require.EqualError(t, err, errNotContractCall.Error())
})
require.EqualError(t, err, errNotContractCall.Error())
})
t.Run("incorrect ", func(t *testing.T) {
bw := io.NewBufBinWriter()
t.Run(fmt.Sprintf("incorrect, compat: %t", dummyMultisig), func(t *testing.T) {
bw := io.NewBufBinWriter()
emit.Int(bw.BinWriter, -1)
emit.String(bw.BinWriter, "test")
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
emit.Syscall(bw.BinWriter, interopnames.SystemContractCall)
emit.Int(bw.BinWriter, -1)
emit.String(bw.BinWriter, "test")
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
emit.Syscall(bw.BinWriter, interopnames.SystemContractCall)
nr := correctNR(bw.Bytes(), false)
nr := correctNR(bw.Bytes(), dummyMultisig, false)
_, err := preparator.Prepare(nr)
_, err := preparator.Prepare(nr)
require.EqualError(t, err, errIncorrectCallFlag.Error())
})
require.EqualError(t, err, errIncorrectCallFlag.Error())
})
}
}
func TestPrepare_IncorrectNR(t *testing.T) {
@ -209,7 +213,23 @@ func TestPrepare_IncorrectNR(t *testing.T) {
InvocationScript: make([]byte, 1),
},
{
InvocationScript: dummyInvocationScript,
InvocationScript: dummyAlphabetInvocationScript,
},
{},
},
},
expErr: errIncorrectProxyWitnesses,
},
{
name: "incorrect main TX proxy witness compat",
addW: false,
mTX: mTX{
scripts: []transaction.Witness{
{
InvocationScript: make([]byte, 1),
},
{
InvocationScript: dummyAlphabetInvocationScriptOld,
},
{},
},
@ -224,7 +244,22 @@ func TestPrepare_IncorrectNR(t *testing.T) {
{},
{
VerificationScript: wrongAlphaVerificationScript,
InvocationScript: dummyInvocationScript,
InvocationScript: dummyAlphabetInvocationScript,
},
{},
},
},
expErr: errIncorrectAlphabet,
},
{
name: "incorrect main TX Alphabet witness compat",
addW: false,
mTX: mTX{
scripts: []transaction.Witness{
{},
{
VerificationScript: wrongAlphaVerificationScript,
InvocationScript: dummyAlphabetInvocationScriptOld,
},
{},
},
@ -239,7 +274,24 @@ func TestPrepare_IncorrectNR(t *testing.T) {
{},
{
VerificationScript: alphaVerificationScript,
InvocationScript: dummyInvocationScript,
InvocationScript: dummyAlphabetInvocationScript,
},
{
InvocationScript: wrongDummyInvocationScript,
},
},
},
expErr: errIncorrectNotaryPlaceholder,
},
{
name: "incorrect main TX Notary witness compat",
addW: false,
mTX: mTX{
scripts: []transaction.Witness{
{},
{
VerificationScript: alphaVerificationScript,
InvocationScript: dummyAlphabetInvocationScriptOld,
},
{
InvocationScript: wrongDummyInvocationScript,
@ -289,7 +341,23 @@ func TestPrepare_IncorrectNR(t *testing.T) {
{},
{
VerificationScript: alphaVerificationScript,
InvocationScript: dummyInvocationScript,
InvocationScript: dummyAlphabetInvocationScript,
},
{},
{},
},
},
expErr: errIncorrectInvokerWitnesses,
},
{
name: "incorrect invoker TX Alphabet witness compat",
addW: true,
mTX: mTX{
scripts: []transaction.Witness{
{},
{
VerificationScript: alphaVerificationScript,
InvocationScript: dummyAlphabetInvocationScriptOld,
},
{},
{},
@ -327,7 +395,7 @@ func TestPrepare_IncorrectNR(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
correctNR := correctNR(nil, test.addW)
correctNR := correctNR(nil, false, test.addW)
incorrectNR = setIncorrectFields(*correctNR, test.mTX, test.fbTX)
_, err = preparator.Prepare(&incorrectNR)
@ -372,40 +440,42 @@ func TestPrepare_CorrectNR(t *testing.T) {
for _, test := range tests {
for i := 0; i < 1; i++ { // run tests against 3 and 4 witness NR
additionalWitness := i == 0
nr := correctNR(script(test.hash, test.method, test.args...), additionalWitness)
for _, dummyMultisig := range []bool{true, false} { // run tests against empty and dummy multisig/Notary witness
additionalWitness := i == 0
nr := correctNR(script(test.hash, test.method, test.args...), dummyMultisig, additionalWitness)
event, err := preparator.Prepare(nr)
event, err := preparator.Prepare(nr)
require.NoError(t, err)
require.Equal(t, test.method, event.Type().String())
require.Equal(t, test.hash.StringLE(), event.ScriptHash().StringLE())
// check args parsing
bw := io.NewBufBinWriter()
emit.Array(bw.BinWriter, test.args...)
ctx := vm.NewContext(bw.Bytes())
opCode, param, err := ctx.Next()
require.NoError(t, err)
for _, opGot := range event.Params() {
require.Equal(t, opCode, opGot.code)
require.Equal(t, param, opGot.param)
opCode, param, err = ctx.Next()
require.NoError(t, err)
require.Equal(t, test.method, event.Type().String())
require.Equal(t, test.hash.StringLE(), event.ScriptHash().StringLE())
// check args parsing
bw := io.NewBufBinWriter()
emit.Array(bw.BinWriter, test.args...)
ctx := vm.NewContext(bw.Bytes())
opCode, param, err := ctx.Next()
require.NoError(t, err)
for _, opGot := range event.Params() {
require.Equal(t, opCode, opGot.code)
require.Equal(t, param, opGot.param)
opCode, param, err = ctx.Next()
require.NoError(t, err)
}
_, _, err = ctx.Next() // PACK opcode
require.NoError(t, err)
_, _, err = ctx.Next() // packing len opcode
require.NoError(t, err)
opCode, _, err = ctx.Next()
require.NoError(t, err)
require.Equal(t, opcode.RET, opCode)
}
_, _, err = ctx.Next() // PACK opcode
require.NoError(t, err)
_, _, err = ctx.Next() // packing len opcode
require.NoError(t, err)
opCode, _, err = ctx.Next()
require.NoError(t, err)
require.Equal(t, opcode.RET, opCode)
}
}
}
@ -428,7 +498,7 @@ func script(hash util.Uint160, method string, args ...any) []byte {
return bw.Bytes()
}
func correctNR(script []byte, additionalWitness bool) *payload.P2PNotaryRequest {
func correctNR(script []byte, dummyMultisig, additionalWitness bool) *payload.P2PNotaryRequest {
alphaVerificationScript, _ := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
signers := []transaction.Signer{
@ -443,20 +513,24 @@ func correctNR(script []byte, additionalWitness bool) *payload.P2PNotaryRequest
signers[2] = transaction.Signer{Account: hash.Hash160(alphaVerificationScript)}
}
multisigInv := dummyAlphabetInvocationScript
if dummyMultisig {
multisigInv = dummyAlphabetInvocationScriptOld
}
scripts := []transaction.Witness{
{},
{
InvocationScript: dummyInvocationScript,
InvocationScript: multisigInv,
VerificationScript: alphaVerificationScript,
},
{
InvocationScript: dummyInvocationScript,
InvocationScript: multisigInv,
},
}
if additionalWitness { // insert on element with index 2
scripts = append(scripts[:2+1], scripts[2:]...)
scripts[2] = transaction.Witness{
InvocationScript: dummyInvocationScript,
InvocationScript: multisigInv,
VerificationScript: alphaVerificationScript,
}
}

View file

@ -11,8 +11,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
@ -27,7 +27,6 @@ type (
// Subscriber is an interface of the NotificationEvent listener.
Subscriber interface {
SubscribeForNotification(...util.Uint160) error
UnsubscribeForNotification()
BlockNotifications() error
SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
@ -36,16 +35,27 @@ type (
Close()
}
subChannels struct {
NotifyChan chan *state.ContainedNotificationEvent
BlockChan chan *block.Block
NotaryChan chan *result.NotaryRequestEvent
}
subscriber struct {
*sync.RWMutex
log *logger.Logger
client *client.Client
notifyChan chan *state.ContainedNotificationEvent
blockChan chan *block.Block
blockChan chan *block.Block
notaryChan chan *result.NotaryRequestEvent
current subChannels
// cached subscription information
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
}
// Params is a group of Subscriber constructor parameters.
@ -76,116 +86,66 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock()
defer s.Unlock()
notifyIDs := make(map[util.Uint160]struct{}, len(contracts))
notifyIDs := make([]string, 0, len(contracts))
for i := range contracts {
if s.subscribedEvents[contracts[i]] {
continue
}
// subscribe to contract notifications
err := s.client.SubscribeForExecutionNotifications(contracts[i])
id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
if err != nil {
// if there is some error, undo all subscriptions and return error
for hash := range notifyIDs {
_ = s.client.UnsubscribeContract(hash)
for _, id := range notifyIDs {
_ = s.client.Unsubscribe(id)
}
return err
}
// save notification id
notifyIDs[contracts[i]] = struct{}{}
notifyIDs = append(notifyIDs, id)
}
for i := range contracts {
s.subscribedEvents[contracts[i]] = true
}
return nil
}
func (s *subscriber) UnsubscribeForNotification() {
err := s.client.UnsubscribeAll()
if err != nil {
s.log.Error(logs.SubscriberUnsubscribeForNotification,
zap.Error(err))
}
}
func (s *subscriber) Close() {
s.client.Close()
}
func (s *subscriber) BlockNotifications() error {
if err := s.client.SubscribeForNewBlocks(); err != nil {
s.Lock()
defer s.Unlock()
if s.subscribedToNewBlocks {
return nil
}
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
return fmt.Errorf("could not subscribe for new block events: %w", err)
}
s.subscribedToNewBlocks = true
return nil
}
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
s.Lock()
defer s.Unlock()
if s.subscribedNotaryEvents[mainTXSigner] {
return nil
}
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
return fmt.Errorf("could not subscribe for notary request events: %w", err)
}
s.subscribedNotaryEvents[mainTXSigner] = true
return nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {
notificationChan := s.client.NotificationChannel()
for {
select {
case <-ctx.Done():
return
case notification, ok := <-notificationChan:
if !ok {
s.log.Warn(logs.SubscriberRemoteNotificationChannelHasBeenClosed)
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
return
}
switch notification.Type {
case neorpc.NotificationEventID:
notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent)
if !ok {
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotifyStruct,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.log.Debug(logs.SubscriberNewNotificationEventFromSidechain,
zap.String("name", notifyEvent.Name),
)
s.notifyChan <- notifyEvent
case neorpc.BlockEventID:
b, ok := notification.Value.(*block.Block)
if !ok {
s.log.Error(logs.SubscriberCantCastBlockEventValueToBlock,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.blockChan <- b
case neorpc.NotaryRequestEventID:
notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent)
if !ok {
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.notaryChan <- notaryRequest
default:
s.log.Debug(logs.SubscriberUnsupportedNotificationFromTheChain,
zap.Uint8("type", uint8(notification.Type)),
)
}
}
}
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) {
switch {
@ -209,16 +169,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent),
}
// Worker listens all events from neo-go websocket and puts them
// into corresponding channel. It may be notifications, transactions,
// new blocks. For now only notifications.
current: newSubChannels(),
subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
go sub.routeNotifications(ctx)
return sub, nil
}
func (s *subscriber) routeNotifications(ctx context.Context) {
var (
// TODO: not needed after nspcc-dev/neo-go#2980.
cliCh = s.client.NotificationChannel()
restoreCh = make(chan bool)
restoreInProgress bool
)
routeloop:
for {
var connLost bool
s.RLock()
curr := s.current
s.RUnlock()
select {
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
if ok {
s.notifyChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.BlockChan:
if ok {
s.blockChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.NotaryChan:
if ok {
s.notaryChan <- ev
} else {
connLost = true
}
case _, ok := <-cliCh:
connLost = !ok
case ok := <-restoreCh:
restoreInProgress = false
if !ok {
connLost = true
}
}
if connLost {
if !restoreInProgress {
restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh)
if !restoreInProgress {
break routeloop
}
curr.drain()
} else { // Avoid getting additional !ok events.
s.Lock()
s.current.NotifyChan = nil
s.current.BlockChan = nil
s.current.NotaryChan = nil
s.Unlock()
}
}
}
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
}
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) {
s.log.Info("RPC connection lost, attempting reconnect")
if !s.client.SwitchRPC(ctx) {
s.log.Error("can't switch RPC node")
return false, nil
}
cliCh := s.client.NotificationChannel()
s.Lock()
chs := newSubChannels()
go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}()
s.current = chs
s.Unlock()
return true, cliCh
}
func newSubChannels() subChannels {
return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent),
BlockChan: make(chan *block.Block),
NotaryChan: make(chan *result.NotaryRequestEvent),
}
}
func (s *subChannels) drain() {
drainloop:
for {
select {
case _, ok := <-s.NotifyChan:
if !ok {
s.NotifyChan = nil
}
case _, ok := <-s.BlockChan:
if !ok {
s.BlockChan = nil
}
case _, ok := <-s.NotaryChan:
if !ok {
s.NotaryChan = nil
}
default:
break drainloop
}
}
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool {
var err error
// new block events restoration
if s.subscribedToNewBlocks {
_, err = s.client.ReceiveBlocks(blCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notification events restoration
for contract := range s.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notary notification events restoration
for signer := range s.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
return true
}
// awaitHeight checks if remote client has least expected block height and
// returns error if it is not reached that height after timeout duration.
// This function is required to avoid connections to unsynced RPC nodes, because