Adopt new neo-go API in morph, drop notaryless events #337
21 changed files with 547 additions and 1479 deletions
|
@ -195,7 +195,6 @@ const (
|
|||
EventIgnoreNilNotaryEventHandler = "ignore nil notary event handler" // Warn in ../node/pkg/morph/event/listener.go
|
||||
EventIgnoreHandlerOfNotaryEventWoParser = "ignore handler of notary event w/o parser" // Warn in ../node/pkg/morph/event/listener.go
|
||||
EventIgnoreNilBlockHandler = "ignore nil block handler" // Warn in ../node/pkg/morph/event/listener.go
|
||||
SubscriberUnsubscribeForNotification = "unsubscribe for notification" // Error in ../node/pkg/morph/subscriber/subscriber.go
|
||||
SubscriberRemoteNotificationChannelHasBeenClosed = "remote notification channel has been closed" // Warn in ../node/pkg/morph/subscriber/subscriber.go
|
||||
SubscriberCantCastNotifyEventValueToTheNotifyStruct = "can't cast notify event value to the notify struct" // Error in ../node/pkg/morph/subscriber/subscriber.go
|
||||
SubscriberNewNotificationEventFromSidechain = "new notification event from sidechain" // Debug in ../node/pkg/morph/subscriber/subscriber.go
|
||||
|
|
|
@ -79,13 +79,6 @@ type NetworkState interface {
|
|||
HomomorphicHashDisabled() (bool, error)
|
||||
}
|
||||
|
||||
const (
|
||||
putNotification = "containerPut"
|
||||
deleteNotification = "containerDelete"
|
||||
|
||||
setEACLNotification = "setEACL"
|
||||
)
|
||||
|
||||
// New creates a container contract processor instance.
|
||||
func New(p *Params) (*Processor, error) {
|
||||
switch {
|
||||
|
@ -121,66 +114,12 @@ func New(p *Params) (*Processor, error) {
|
|||
|
||||
// ListenerNotificationParsers for the 'event.Listener' event producer.
|
||||
func (cp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
|
||||
if !cp.notaryDisabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
parsers = make([]event.NotificationParserInfo, 0, 3)
|
||||
|
||||
p event.NotificationParserInfo
|
||||
)
|
||||
|
||||
p.SetScriptHash(cp.cnrClient.ContractAddress())
|
||||
|
||||
// container put
|
||||
p.SetType(event.TypeFromString(putNotification))
|
||||
p.SetParser(containerEvent.ParsePut)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
// container delete
|
||||
p.SetType(event.TypeFromString(deleteNotification))
|
||||
p.SetParser(containerEvent.ParseDelete)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
// set eACL
|
||||
p.SetType(event.TypeFromString(setEACLNotification))
|
||||
p.SetParser(containerEvent.ParseSetEACL)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
return parsers
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenerNotificationHandlers for the 'event.Listener' event producer.
|
||||
func (cp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
|
||||
if !cp.notaryDisabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
handlers = make([]event.NotificationHandlerInfo, 0, 3)
|
||||
|
||||
h event.NotificationHandlerInfo
|
||||
)
|
||||
|
||||
h.SetScriptHash(cp.cnrClient.ContractAddress())
|
||||
|
||||
// container put
|
||||
h.SetType(event.TypeFromString(putNotification))
|
||||
h.SetHandler(cp.handlePut)
|
||||
handlers = append(handlers, h)
|
||||
|
||||
// container delete
|
||||
h.SetType(event.TypeFromString(deleteNotification))
|
||||
h.SetHandler(cp.handleDelete)
|
||||
handlers = append(handlers, h)
|
||||
|
||||
// set eACL
|
||||
h.SetType(event.TypeFromString(setEACLNotification))
|
||||
h.SetHandler(cp.handleSetEACL)
|
||||
handlers = append(handlers, h)
|
||||
|
||||
return handlers
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListenerNotaryParsers for the 'event.Listener' notary event producer.
|
||||
|
|
|
@ -119,9 +119,7 @@ type (
|
|||
)
|
||||
|
||||
const (
|
||||
newEpochNotification = "NewEpoch"
|
||||
addPeerNotification = "AddPeer"
|
||||
updatePeerStateNotification = "UpdateState"
|
||||
newEpochNotification = "NewEpoch"
|
||||
)
|
||||
|
||||
// New creates network map contract processor instance.
|
||||
|
@ -189,20 +187,6 @@ func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInf
|
|||
p.SetParser(netmapEvent.ParseNewEpoch)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
if !np.notaryDisabled {
|
||||
return parsers
|
||||
}
|
||||
|
||||
// new peer event
|
||||
p.SetType(addPeerNotification)
|
||||
p.SetParser(netmapEvent.ParseAddPeer)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
// update peer event
|
||||
p.SetType(updatePeerStateNotification)
|
||||
p.SetParser(netmapEvent.ParseUpdatePeer)
|
||||
parsers = append(parsers, p)
|
||||
|
||||
return parsers
|
||||
}
|
||||
|
||||
|
@ -219,20 +203,6 @@ func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI
|
|||
i.SetHandler(np.handleNewEpoch)
|
||||
handlers = append(handlers, i)
|
||||
|
||||
if !np.notaryDisabled {
|
||||
return handlers
|
||||
}
|
||||
|
||||
// new peer handler
|
||||
i.SetType(addPeerNotification)
|
||||
i.SetHandler(np.handleAddPeer)
|
||||
handlers = append(handlers, i)
|
||||
|
||||
// update peer handler
|
||||
i.SetType(updatePeerStateNotification)
|
||||
i.SetHandler(np.handleUpdateState)
|
||||
handlers = append(handlers, i)
|
||||
|
||||
return handlers
|
||||
}
|
||||
|
||||
|
|
|
@ -69,9 +69,6 @@ type Client struct {
|
|||
// on every normal call.
|
||||
switchLock *sync.RWMutex
|
||||
|
||||
notifications chan rpcclient.Notification
|
||||
subsInfo // protected with switchLock
|
||||
|
||||
// channel for internal stop
|
||||
closeChan chan struct{}
|
||||
|
||||
|
@ -156,8 +153,6 @@ func (e *notHaltStateError) Error() string {
|
|||
)
|
||||
}
|
||||
|
||||
var errEmptyInvocationScript = errors.New("got empty invocation script from neo node")
|
||||
|
||||
// implementation of error interface for FrostFS-specific errors.
|
||||
type frostfsError struct {
|
||||
err error
|
||||
|
@ -566,26 +561,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val
|
|||
|
||||
// NotificationChannel returns channel than receives subscribed
|
||||
// notification from the connected RPC node.
|
||||
// Channel is closed when connection to the RPC node has been
|
||||
// lost without the possibility of recovery.
|
||||
// Channel is closed when connection to the RPC node is lost.
|
||||
func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
|
||||
return c.notifications
|
||||
}
|
||||
|
||||
// inactiveMode switches Client to an inactive mode:
|
||||
// - notification channel is closed;
|
||||
// - all the new RPC request would return ErrConnectionLost;
|
||||
// - inactiveModeCb is called if not nil.
|
||||
func (c *Client) inactiveMode() {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
close(c.notifications)
|
||||
c.inactive = true
|
||||
|
||||
if c.cfg.inactiveModeCb != nil {
|
||||
c.cfg.inactiveModeCb()
|
||||
}
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
|
||||
}
|
||||
|
||||
func (c *Client) setActor(act *actor.Actor) {
|
||||
|
|
|
@ -10,11 +10,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
|
@ -108,21 +105,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
}
|
||||
|
||||
cli := &Client{
|
||||
cache: newClientCache(),
|
||||
logger: cfg.logger,
|
||||
acc: acc,
|
||||
accAddr: accAddr,
|
||||
cfg: *cfg,
|
||||
switchLock: &sync.RWMutex{},
|
||||
notifications: make(chan rpcclient.Notification),
|
||||
subsInfo: subsInfo{
|
||||
blockRcv: make(chan *block.Block),
|
||||
notificationRcv: make(chan *state.ContainedNotificationEvent),
|
||||
notaryReqRcv: make(chan *result.NotaryRequestEvent),
|
||||
subscribedEvents: make(map[util.Uint160]string),
|
||||
subscribedNotaryEvents: make(map[util.Uint160]string),
|
||||
},
|
||||
closeChan: make(chan struct{}),
|
||||
cache: newClientCache(),
|
||||
logger: cfg.logger,
|
||||
acc: acc,
|
||||
accAddr: accAddr,
|
||||
cfg: *cfg,
|
||||
switchLock: &sync.RWMutex{},
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
cli.endpoints.init(cfg.endpoints)
|
||||
|
@ -162,7 +151,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
}
|
||||
cli.setActor(act)
|
||||
|
||||
go cli.notificationLoop(ctx)
|
||||
go cli.closeWaiter(ctx)
|
||||
|
||||
return cli, nil
|
||||
}
|
||||
|
|
|
@ -6,11 +6,6 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -34,7 +29,8 @@ func (e *endpoints) init(ee []Endpoint) {
|
|||
e.list = ee
|
||||
}
|
||||
|
||||
func (c *Client) switchRPC(ctx context.Context) bool {
|
||||
// SwitchRPC performs reconnection and returns true if it was successful.
|
||||
func (c *Client) SwitchRPC(ctx context.Context) bool {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
|
@ -58,20 +54,8 @@ func (c *Client) switchRPC(ctx context.Context) bool {
|
|||
c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished,
|
||||
zap.String("endpoint", newEndpoint))
|
||||
|
||||
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
|
||||
if !ok {
|
||||
// new WS client does not allow
|
||||
// restoring subscription, client
|
||||
// could not work correctly =>
|
||||
// closing connection to RPC node
|
||||
// to switch to another one
|
||||
cli.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
c.client = cli
|
||||
c.setActor(act)
|
||||
c.subsInfo = subs
|
||||
|
||||
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
||||
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
||||
|
@ -82,97 +66,21 @@ func (c *Client) switchRPC(ctx context.Context) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
c.inactive = true
|
||||
|
||||
if c.cfg.inactiveModeCb != nil {
|
||||
c.cfg.inactiveModeCb()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Client) notificationLoop(ctx context.Context) {
|
||||
var e any
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
c.switchLock.RLock()
|
||||
bChan := c.blockRcv
|
||||
nChan := c.notificationRcv
|
||||
nrChan := c.notaryReqRcv
|
||||
c.switchLock.RUnlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
|
||||
return
|
||||
case e, ok = <-bChan:
|
||||
case e, ok = <-nChan:
|
||||
case e, ok = <-nrChan:
|
||||
}
|
||||
|
||||
if ok {
|
||||
c.routeEvent(ctx, e)
|
||||
continue
|
||||
}
|
||||
|
||||
if !c.reconnect(ctx) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) routeEvent(ctx context.Context, e any) {
|
||||
typedNotification := rpcclient.Notification{Value: e}
|
||||
|
||||
switch e.(type) {
|
||||
case *block.Block:
|
||||
typedNotification.Type = neorpc.BlockEventID
|
||||
case *state.ContainedNotificationEvent:
|
||||
typedNotification.Type = neorpc.NotificationEventID
|
||||
case *result.NotaryRequestEvent:
|
||||
typedNotification.Type = neorpc.NotaryRequestEventID
|
||||
}
|
||||
|
||||
func (c *Client) closeWaiter(ctx context.Context) {
|
||||
select {
|
||||
case c.notifications <- typedNotification:
|
||||
case <-ctx.Done():
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
case <-c.closeChan:
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) reconnect(ctx context.Context) bool {
|
||||
if closeErr := c.client.GetError(); closeErr != nil {
|
||||
c.logger.Warn(logs.ClientSwitchingToTheNextRPCNode,
|
||||
zap.String("reason", closeErr.Error()),
|
||||
)
|
||||
} else {
|
||||
// neo-go client was closed by calling `Close`
|
||||
// method, that happens only when a client has
|
||||
// switched to the more prioritized RPC
|
||||
return true
|
||||
}
|
||||
|
||||
if !c.switchRPC(ctx) {
|
||||
c.logger.Error(logs.ClientCouldNotEstablishConnectionToAnyRPCNode)
|
||||
|
||||
// could not connect to all endpoints =>
|
||||
// switch client to inactive mode
|
||||
c.inactiveMode()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||
// of the client to allow checking chain state since during switch
|
||||
// process some notification could be lost
|
||||
|
||||
return true
|
||||
_ = c.UnsubscribeAll()
|
||||
c.close()
|
||||
}
|
||||
|
||||
func (c *Client) switchToMostPrioritized(ctx context.Context) {
|
||||
|
@ -218,36 +126,28 @@ mainLoop:
|
|||
continue
|
||||
}
|
||||
|
||||
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok {
|
||||
c.switchLock.Lock()
|
||||
|
||||
// higher priority node could have been
|
||||
// connected in the other goroutine
|
||||
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
|
||||
cli.Close()
|
||||
c.switchLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
c.client.Close()
|
||||
c.cache.invalidate()
|
||||
c.client = cli
|
||||
c.setActor(act)
|
||||
c.subsInfo = subs
|
||||
c.endpoints.curr = i
|
||||
c.switchLock.Lock()
|
||||
|
||||
// higher priority node could have been
|
||||
// connected in the other goroutine
|
||||
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
|
||||
cli.Close()
|
||||
c.switchLock.Unlock()
|
||||
|
||||
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
|
||||
zap.String("endpoint", tryE))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Warn(logs.ClientCouldNotRestoreSideChainSubscriptionsUsingNode,
|
||||
zap.String("endpoint", tryE),
|
||||
zap.Error(err),
|
||||
)
|
||||
c.client.Close()
|
||||
c.cache.invalidate()
|
||||
c.client = cli
|
||||
c.setActor(act)
|
||||
c.endpoints.curr = i
|
||||
|
||||
c.switchLock.Unlock()
|
||||
|
||||
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
|
||||
zap.String("endpoint", tryE))
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -255,6 +155,7 @@ mainLoop:
|
|||
|
||||
// close closes notification channel and wrapped WS client.
|
||||
func (c *Client) close() {
|
||||
|
||||
close(c.notifications)
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
c.client.Close()
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"crypto/elliptic"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -18,19 +19,20 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/notary"
|
||||
sc "github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
notaryInfo struct {
|
||||
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
|
||||
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
|
||||
fallbackTime uint32 // mainTx's ValidUntilBlock - fallbackTime + 1 is when fallbackTx is sent
|
||||
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
|
||||
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
|
||||
|
||||
alphabetSource AlphabetKeys // source of alphabet node keys to prepare witness
|
||||
|
||||
|
@ -41,7 +43,7 @@ type (
|
|||
notaryCfg struct {
|
||||
proxy util.Uint160
|
||||
|
||||
txValidTime, roundTime, fallbackTime uint32
|
||||
txValidTime, roundTime uint32
|
||||
|
||||
alphabetSource AlphabetKeys
|
||||
}
|
||||
|
@ -51,9 +53,8 @@ type (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultNotaryValidTime = 50
|
||||
defaultNotaryRoundTime = 100
|
||||
defaultNotaryFallbackTime = 40
|
||||
defaultNotaryValidTime = 50
|
||||
defaultNotaryRoundTime = 100
|
||||
|
||||
notaryBalanceOfMethod = "balanceOf"
|
||||
notaryExpirationOfMethod = "expirationOf"
|
||||
|
@ -69,7 +70,6 @@ func defaultNotaryConfig(c *Client) *notaryCfg {
|
|||
return ¬aryCfg{
|
||||
txValidTime: defaultNotaryValidTime,
|
||||
roundTime: defaultNotaryRoundTime,
|
||||
fallbackTime: defaultNotaryFallbackTime,
|
||||
alphabetSource: c.Committee,
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +104,6 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error {
|
|||
proxy: cfg.proxy,
|
||||
txValidTime: cfg.txValidTime,
|
||||
roundTime: cfg.roundTime,
|
||||
fallbackTime: cfg.fallbackTime,
|
||||
alphabetSource: cfg.alphabetSource,
|
||||
notary: notary.Hash,
|
||||
}
|
||||
|
@ -408,32 +407,32 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error {
|
|||
return fmt.Errorf("could not fetch current alphabet keys: %w", err)
|
||||
}
|
||||
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
|
||||
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// mainTX is expected to be pre-validated: second witness must exist and be empty
|
||||
mainTx.Scripts[1].VerificationScript = multiaddrAccount.GetVerificationScript()
|
||||
mainTx.Scripts[1].InvocationScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
multiaddrAccount.SignHashable(c.rpcActor.GetNetwork(), mainTx)...,
|
||||
)
|
||||
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sign exactly the same transaction we've got from the received Notary request.
|
||||
err = nAct.Sign(mainTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("faield to sign notary request: %w", err)
|
||||
}
|
||||
|
||||
mainH, fbH, untilActual, err := nAct.Notarize(mainTx, nil)
|
||||
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
|
||||
[]byte{byte(opcode.RET)},
|
||||
-1,
|
||||
0,
|
||||
c.notary.fallbackTime,
|
||||
c.acc)
|
||||
if err != nil && !alreadyOnChainError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Debug(logs.ClientNotaryRequestWithPreparedMainTXInvoked,
|
||||
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
|
||||
zap.Stringer("tx_hash", resp.Hash().Reverse()))
|
||||
zap.String("tx_hash", mainH.StringLE()),
|
||||
zap.Uint32("valid_until_block", untilActual),
|
||||
zap.String("fallback_hash", fbH.StringLE()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -449,70 +448,147 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint
|
|||
return err
|
||||
}
|
||||
|
||||
cosigners, err := c.notaryCosigners(invokedByAlpha, alphabetList, committee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params, err := invocationParams(args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
test, err := c.makeTestInvocation(contract, method, params, cosigners)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, committee, invokedByAlpha)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
until, err := c.getUntilValue(vub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mainTx, err := c.buildMainTx(invokedByAlpha, nonce, alphabetList, test, cosigners, multiaddrAccount, until)
|
||||
cosigners, err := c.notaryCosigners(invokedByAlpha, alphabetList, committee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
|
||||
[]byte{byte(opcode.RET)},
|
||||
-1,
|
||||
0,
|
||||
c.notary.fallbackTime,
|
||||
c.acc)
|
||||
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mainH, fbH, untilActual, err := nAct.Notarize(nAct.MakeTunedCall(contract, method, nil, func(r *result.Invoke, t *transaction.Transaction) error {
|
||||
if r.State != vmstate.Halt.String() {
|
||||
return wrapFrostFSError(¬HaltStateError{state: r.State, exception: r.FaultException})
|
||||
}
|
||||
|
||||
t.ValidUntilBlock = until
|
||||
t.Nonce = nonce
|
||||
|
||||
return nil
|
||||
}, args...))
|
||||
|
||||
if err != nil && !alreadyOnChainError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Debug(logs.ClientNotaryRequestInvoked,
|
||||
zap.String("method", method),
|
||||
zap.Uint32("valid_until_block", until),
|
||||
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
|
||||
zap.Stringer("tx_hash", resp.Hash().Reverse()))
|
||||
zap.Uint32("valid_until_block", untilActual),
|
||||
zap.String("tx_hash", mainH.StringLE()),
|
||||
zap.String("fallback_hash", fbH.StringLE()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) makeTestInvocation(contract util.Uint160, method string, params []sc.Parameter, cosigners []transaction.Signer) (*result.Invoke, error) {
|
||||
test, err := c.client.InvokeFunction(contract, method, params, cosigners)
|
||||
func (c *Client) notaryCosignersFromTx(mainTx *transaction.Transaction, alphabetList keys.PublicKeys) ([]actor.SignerAccount, error) {
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if test.State != HaltState {
|
||||
return nil, wrapFrostFSError(¬HaltStateError{state: test.State, exception: test.FaultException})
|
||||
// Here we need to add a committee signature (second witness) to the pre-validated
|
||||
// main transaction without creating a new one. However, Notary actor demands the
|
||||
// proper set of signers for constructor, thus, fill it from the main transaction's signers list.
|
||||
s := make([]actor.SignerAccount, 2, 3)
|
||||
s[0] = actor.SignerAccount{
|
||||
// Proxy contract that will pay for the execution.
|
||||
Signer: mainTx.Signers[0],
|
||||
Account: notary.FakeContractAccount(mainTx.Signers[0].Account),
|
||||
}
|
||||
s[1] = actor.SignerAccount{
|
||||
// Inner ring multisignature.
|
||||
Signer: mainTx.Signers[1],
|
||||
Account: multiaddrAccount,
|
||||
}
|
||||
if len(mainTx.Signers) > 3 {
|
||||
// Invoker signature (simple signature account of storage node is expected).
|
||||
var acc *wallet.Account
|
||||
script := mainTx.Scripts[2].VerificationScript
|
||||
if len(script) == 0 {
|
||||
acc = notary.FakeContractAccount(mainTx.Signers[2].Account)
|
||||
} else {
|
||||
pubBytes, ok := vm.ParseSignatureContract(script)
|
||||
if ok {
|
||||
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key: %w", err)
|
||||
}
|
||||
acc = notary.FakeSimpleAccount(pub)
|
||||
} else {
|
||||
m, pubsBytes, ok := vm.ParseMultiSigContract(script)
|
||||
if !ok {
|
||||
return nil, errors.New("failed to parse verification script of signer #2: unknown witness type")
|
||||
}
|
||||
pubs := make(keys.PublicKeys, len(pubsBytes))
|
||||
for i := range pubs {
|
||||
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key #%d: %w", i, err)
|
||||
}
|
||||
}
|
||||
acc, err = notary.FakeMultisigAccount(m, pubs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create fake account for signer #2: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
s = append(s, actor.SignerAccount{
|
||||
Signer: mainTx.Signers[2],
|
||||
Account: acc,
|
||||
})
|
||||
}
|
||||
|
||||
if len(test.Script) == 0 {
|
||||
return nil, wrapFrostFSError(errEmptyInvocationScript)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]actor.SignerAccount, error) {
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(ir, committee, invokedByAlpha)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return test, nil
|
||||
s := make([]actor.SignerAccount, 2, 3)
|
||||
// Proxy contract that will pay for the execution.
|
||||
s[0] = actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: c.notary.proxy,
|
||||
Scopes: transaction.None,
|
||||
},
|
||||
Account: notary.FakeContractAccount(c.notary.proxy),
|
||||
}
|
||||
// Inner ring multisignature.
|
||||
s[1] = actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: multiaddrAccount.ScriptHash(),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
},
|
||||
Account: multiaddrAccount,
|
||||
}
|
||||
|
||||
if !invokedByAlpha {
|
||||
// Invoker signature.
|
||||
s = append(s, actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: hash.Hash160(c.acc.GetVerificationScript()),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
},
|
||||
Account: c.acc,
|
||||
})
|
||||
}
|
||||
|
||||
// The last one is Notary contract that will be added to the signers list
|
||||
// by Notary actor automatically.
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
|
||||
|
@ -522,195 +598,6 @@ func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
|
|||
return c.notaryTxValidationLimit()
|
||||
}
|
||||
|
||||
func (c *Client) buildMainTx(invokedByAlpha bool, nonce uint32, alphabetList keys.PublicKeys, test *result.Invoke,
|
||||
cosigners []transaction.Signer, multiaddrAccount *wallet.Account, until uint32) (*transaction.Transaction, error) {
|
||||
// after test invocation we build main multisig transaction
|
||||
|
||||
u8n := uint8(len(alphabetList))
|
||||
|
||||
if !invokedByAlpha {
|
||||
u8n++
|
||||
}
|
||||
|
||||
// prepare main tx
|
||||
mainTx := &transaction.Transaction{
|
||||
Nonce: nonce,
|
||||
SystemFee: test.GasConsumed,
|
||||
ValidUntilBlock: until,
|
||||
Script: test.Script,
|
||||
Attributes: []transaction.Attribute{
|
||||
{
|
||||
Type: transaction.NotaryAssistedT,
|
||||
Value: &transaction.NotaryAssisted{NKeys: u8n},
|
||||
},
|
||||
},
|
||||
Signers: cosigners,
|
||||
}
|
||||
|
||||
// calculate notary fee
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
notaryFee, err := c.client.CalculateNotaryFee(u8n)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add network fee for cosigners
|
||||
//nolint:staticcheck // waits for neo-go v0.99.3 with notary actors
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
err = c.client.AddNetworkFee(
|
||||
mainTx,
|
||||
notaryFee,
|
||||
c.notaryAccounts(invokedByAlpha, multiaddrAccount)...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// define witnesses
|
||||
mainTx.Scripts = c.notaryWitnesses(invokedByAlpha, multiaddrAccount, mainTx)
|
||||
|
||||
return mainTx, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]transaction.Signer, error) {
|
||||
s := make([]transaction.Signer, 0, 4)
|
||||
|
||||
// first we have proxy contract signature, as it will pay for the execution
|
||||
s = append(s, transaction.Signer{
|
||||
Account: c.notary.proxy,
|
||||
Scopes: transaction.None,
|
||||
})
|
||||
|
||||
// then we have inner ring multiaddress signature
|
||||
m := sigCount(ir, committee)
|
||||
|
||||
multisigScript, err := sc.CreateMultiSigRedeemScript(m, ir)
|
||||
if err != nil {
|
||||
// wrap error as FrostFS-specific since the call is not related to any client
|
||||
return nil, wrapFrostFSError(fmt.Errorf("can't create ir multisig redeem script: %w", err))
|
||||
}
|
||||
|
||||
s = append(s, transaction.Signer{
|
||||
Account: hash.Hash160(multisigScript),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
})
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker signature
|
||||
s = append(s, transaction.Signer{
|
||||
Account: hash.Hash160(c.acc.GetVerificationScript()),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
})
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract signature
|
||||
s = append(s, transaction.Signer{
|
||||
Account: c.notary.notary,
|
||||
Scopes: transaction.None,
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryAccounts(invokedByAlpha bool, multiaddr *wallet.Account) []*wallet.Account {
|
||||
if multiaddr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
a := make([]*wallet.Account, 0, 4)
|
||||
|
||||
// first we have proxy account, as it will pay for the execution
|
||||
a = append(a, notary.FakeContractAccount(c.notary.proxy))
|
||||
|
||||
// then we have inner ring multiaddress account
|
||||
a = append(a, multiaddr)
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker account
|
||||
a = append(a, c.acc)
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract account
|
||||
a = append(a, &wallet.Account{
|
||||
Contract: &wallet.Contract{},
|
||||
})
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func (c *Client) notaryWitnesses(invokedByAlpha bool, multiaddr *wallet.Account, tx *transaction.Transaction) []transaction.Witness {
|
||||
if multiaddr == nil || tx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
w := make([]transaction.Witness, 0, 4)
|
||||
|
||||
// first we have empty proxy witness, because notary will execute `Verify`
|
||||
// method on the proxy contract to check witness
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: []byte{},
|
||||
VerificationScript: []byte{},
|
||||
})
|
||||
|
||||
// then we have inner ring multiaddress witness
|
||||
|
||||
// invocation script should be of the form:
|
||||
// { PUSHDATA1, 64, signatureBytes... }
|
||||
// to pass Notary module verification
|
||||
var invokeScript []byte
|
||||
|
||||
magicNumber := c.rpcActor.GetNetwork()
|
||||
|
||||
if invokedByAlpha {
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
multiaddr.SignHashable(magicNumber, tx)...,
|
||||
)
|
||||
} else {
|
||||
// we can't provide alphabet node signature
|
||||
// because Storage Node doesn't own alphabet's
|
||||
// private key. Thus, add dummy witness with
|
||||
// empty bytes instead of signature
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
make([]byte, 64)...,
|
||||
)
|
||||
}
|
||||
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: invokeScript,
|
||||
VerificationScript: multiaddr.GetVerificationScript(),
|
||||
})
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker witness
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
c.acc.SignHashable(magicNumber, tx)...,
|
||||
)
|
||||
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: invokeScript,
|
||||
VerificationScript: c.acc.GetVerificationScript(),
|
||||
})
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract witness
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
make([]byte, 64)...,
|
||||
),
|
||||
VerificationScript: []byte{},
|
||||
})
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedByAlpha bool) (*wallet.Account, error) {
|
||||
m := sigCount(ir, committee)
|
||||
|
||||
|
@ -767,21 +654,6 @@ func (c *Client) depositExpirationOf() (int64, error) {
|
|||
return currentTillBig.Int64(), nil
|
||||
}
|
||||
|
||||
func invocationParams(args ...any) ([]sc.Parameter, error) {
|
||||
params := make([]sc.Parameter, 0, len(args))
|
||||
|
||||
for i := range args {
|
||||
param, err := toStackParameter(args[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params = append(params, param)
|
||||
}
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// sigCount returns the number of required signature.
|
||||
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
|
||||
// If committee is true, returns M as N/2+1.
|
||||
|
@ -809,15 +681,6 @@ func WithRoundTime(t uint32) NotaryOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithFallbackTime returns a notary support option for client
|
||||
// that specifies amount of blocks before fallbackTx will be sent.
|
||||
// Should be less than TxValidTime.
|
||||
func WithFallbackTime(t uint32) NotaryOption {
|
||||
return func(c *notaryCfg) {
|
||||
c.fallbackTime = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithAlphabetSource returns a notary support option for client
|
||||
// that specifies function to return list of alphabet node keys.
|
||||
// By default notary subsystem uses committee as a source. This is
|
||||
|
|
|
@ -1,16 +1,11 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Close closes connection to the remote side making
|
||||
|
@ -24,71 +19,46 @@ func (c *Client) Close() {
|
|||
close(c.closeChan)
|
||||
}
|
||||
|
||||
// SubscribeForExecutionNotifications adds subscription for notifications
|
||||
// generated during contract transaction execution to this instance of client.
|
||||
// ReceiveExecutionNotifications performs subscription for notifications
|
||||
// generated during contract execution. Events are sent to the specified channel.
|
||||
//
|
||||
// Returns ErrConnectionLost if client has not been able to establish
|
||||
// connection to any of passed RPC endpoints.
|
||||
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error {
|
||||
func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
if c.inactive {
|
||||
return ErrConnectionLost
|
||||
return "", ErrConnectionLost
|
||||
}
|
||||
|
||||
_, subscribed := c.subscribedEvents[contract]
|
||||
if subscribed {
|
||||
// no need to subscribe one more time
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.subscribedEvents[contract] = id
|
||||
|
||||
return nil
|
||||
return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
|
||||
}
|
||||
|
||||
// SubscribeForNewBlocks adds subscription for new block events to this
|
||||
// instance of client.
|
||||
// ReceiveBlocks performs subscription for new block events. Events are sent
|
||||
// to the specified channel.
|
||||
//
|
||||
// Returns ErrConnectionLost if client has not been able to establish
|
||||
// connection to any of passed RPC endpoints.
|
||||
func (c *Client) SubscribeForNewBlocks() error {
|
||||
func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
if c.inactive {
|
||||
return ErrConnectionLost
|
||||
return "", ErrConnectionLost
|
||||
}
|
||||
|
||||
if c.subscribedToBlocks {
|
||||
// no need to subscribe one more time
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.subscribedToBlocks = true
|
||||
|
||||
return nil
|
||||
return c.client.ReceiveBlocks(nil, ch)
|
||||
}
|
||||
|
||||
// SubscribeForNotaryRequests adds subscription for notary request payloads
|
||||
// ReceiveNotaryRequests performsn subscription for notary request payloads
|
||||
// addition or removal events to this instance of client. Passed txSigner is
|
||||
// used as filter: subscription is only for the notary requests that must be
|
||||
// signed by txSigner.
|
||||
// signed by txSigner. Events are sent to the specified channel.
|
||||
//
|
||||
// Returns ErrConnectionLost if client has not been able to establish
|
||||
// connection to any of passed RPC endpoints.
|
||||
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
|
||||
func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
|
||||
if c.notary == nil {
|
||||
panic(notaryNotEnabledPanicMsg)
|
||||
}
|
||||
|
@ -97,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
|
|||
defer c.switchLock.Unlock()
|
||||
|
||||
if c.inactive {
|
||||
return ErrConnectionLost
|
||||
return "", ErrConnectionLost
|
||||
}
|
||||
|
||||
_, subscribed := c.subscribedNotaryEvents[txSigner]
|
||||
if subscribed {
|
||||
// no need to subscribe one more time
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.subscribedNotaryEvents[txSigner] = id
|
||||
|
||||
return nil
|
||||
return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
|
||||
}
|
||||
|
||||
// UnsubscribeContract removes subscription for given contract event stream.
|
||||
// Unsubscribe performs unsubscription for the given subscription ID.
|
||||
//
|
||||
// Returns ErrConnectionLost if client has not been able to establish
|
||||
// connection to any of passed RPC endpoints.
|
||||
func (c *Client) UnsubscribeContract(contract util.Uint160) error {
|
||||
func (c *Client) Unsubscribe(subID string) error {
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
|
@ -128,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error {
|
|||
return ErrConnectionLost
|
||||
}
|
||||
|
||||
_, subscribed := c.subscribedEvents[contract]
|
||||
if !subscribed {
|
||||
// no need to unsubscribe contract
|
||||
// without subscription
|
||||
return nil
|
||||
}
|
||||
|
||||
err := c.client.Unsubscribe(c.subscribedEvents[contract])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(c.subscribedEvents, contract)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeNotaryRequest removes subscription for given notary requests
|
||||
// signer.
|
||||
//
|
||||
// Returns ErrConnectionLost if client has not been able to establish
|
||||
// connection to any of passed RPC endpoints.
|
||||
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
|
||||
if c.notary == nil {
|
||||
panic(notaryNotEnabledPanicMsg)
|
||||
}
|
||||
|
||||
c.switchLock.Lock()
|
||||
defer c.switchLock.Unlock()
|
||||
|
||||
if c.inactive {
|
||||
return ErrConnectionLost
|
||||
}
|
||||
|
||||
_, subscribed := c.subscribedNotaryEvents[signer]
|
||||
if !subscribed {
|
||||
// no need to unsubscribe signer's
|
||||
// requests without subscription
|
||||
return nil
|
||||
}
|
||||
|
||||
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(c.subscribedNotaryEvents, signer)
|
||||
|
||||
return nil
|
||||
return c.client.Unsubscribe(subID)
|
||||
}
|
||||
|
||||
// UnsubscribeAll removes all active subscriptions of current client.
|
||||
|
@ -191,163 +100,10 @@ func (c *Client) UnsubscribeAll() error {
|
|||
return ErrConnectionLost
|
||||
}
|
||||
|
||||
// no need to unsubscribe if there are
|
||||
// no active subscriptions
|
||||
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
|
||||
!c.subscribedToBlocks {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := c.client.UnsubscribeAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.subscribedEvents = make(map[util.Uint160]string)
|
||||
c.subscribedNotaryEvents = make(map[util.Uint160]string)
|
||||
c.subscribedToBlocks = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// subsInfo includes channels for ws notifications;
|
||||
// cached subscription information.
|
||||
type subsInfo struct {
|
||||
blockRcv chan *block.Block
|
||||
notificationRcv chan *state.ContainedNotificationEvent
|
||||
notaryReqRcv chan *result.NotaryRequestEvent
|
||||
|
||||
subscribedToBlocks bool
|
||||
subscribedEvents map[util.Uint160]string
|
||||
subscribedNotaryEvents map[util.Uint160]string
|
||||
}
|
||||
|
||||
// restoreSubscriptions restores subscriptions according to cached
|
||||
// information about them.
|
||||
//
|
||||
// If it is NOT a background operation switchLock MUST be held.
|
||||
// Returns a pair: the second is a restoration status and the first
|
||||
// one contains subscription information applied to the passed cli
|
||||
// and receivers for the updated subscriptions.
|
||||
// Does not change Client instance.
|
||||
func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
|
||||
var (
|
||||
err error
|
||||
id string
|
||||
)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
blockRcv := make(chan *block.Block)
|
||||
notificationRcv := make(chan *state.ContainedNotificationEvent)
|
||||
notaryReqRcv := make(chan *result.NotaryRequestEvent)
|
||||
|
||||
c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background)
|
||||
|
||||
if background {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
}
|
||||
|
||||
si.subscribedToBlocks = c.subscribedToBlocks
|
||||
si.subscribedEvents = copySubsMap(c.subscribedEvents)
|
||||
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
|
||||
si.blockRcv = blockRcv
|
||||
si.notificationRcv = notificationRcv
|
||||
si.notaryReqRcv = notaryReqRcv
|
||||
|
||||
// new block events restoration
|
||||
if si.subscribedToBlocks {
|
||||
_, err = cli.ReceiveBlocks(nil, blockRcv)
|
||||
if err != nil {
|
||||
c.logger.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch,
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// notification events restoration
|
||||
for contract := range si.subscribedEvents {
|
||||
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
|
||||
if err != nil {
|
||||
c.logger.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch,
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
si.subscribedEvents[contract] = id
|
||||
}
|
||||
|
||||
// notary notification events restoration
|
||||
if c.notary != nil {
|
||||
for signer := range si.subscribedNotaryEvents {
|
||||
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
|
||||
if err != nil {
|
||||
c.logger.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch,
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
si.subscribedNotaryEvents[signer] = id
|
||||
}
|
||||
}
|
||||
|
||||
return si, true
|
||||
}
|
||||
|
||||
func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block,
|
||||
notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) {
|
||||
// neo-go WS client says to _always_ read notifications
|
||||
// from its channel. Subscribing to any notification
|
||||
// while not reading them in another goroutine may
|
||||
// lead to a dead-lock, thus that async side notification
|
||||
// listening while restoring subscriptions
|
||||
|
||||
go func() {
|
||||
var e any
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case e, ok = <-blockRcv:
|
||||
case e, ok = <-notificationRcv:
|
||||
case e, ok = <-notaryReqRcv:
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if background {
|
||||
// background client (test) switch, no need to send
|
||||
// any notification, just preventing dead-lock
|
||||
continue
|
||||
}
|
||||
|
||||
c.routeEvent(ctx, e)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
|
||||
newM := make(map[util.Uint160]string, len(m))
|
||||
for k, v := range m {
|
||||
newM[k] = v
|
||||
}
|
||||
|
||||
return newM
|
||||
}
|
||||
|
|
|
@ -44,45 +44,6 @@ func (d Delete) NotaryRequest() *payload.P2PNotaryRequest {
|
|||
|
||||
const expectedItemNumDelete = 3
|
||||
|
||||
// ParseDelete from notification into container event structure.
|
||||
//
|
||||
// Expects 3 stack items.
|
||||
func ParseDelete(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||
var (
|
||||
ev Delete
|
||||
err error
|
||||
)
|
||||
|
||||
params, err := event.ParseStackArray(e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
||||
}
|
||||
|
||||
if ln := len(params); ln != expectedItemNumDelete {
|
||||
return nil, event.WrongNumberOfParameters(expectedItemNumDelete, ln)
|
||||
}
|
||||
|
||||
// parse container
|
||||
ev.ContainerIDValue, err = client.BytesFromStackItem(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// parse signature
|
||||
ev.SignatureValue, err = client.BytesFromStackItem(params[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get signature: %w", err)
|
||||
}
|
||||
|
||||
// parse session token
|
||||
ev.TokenValue, err = client.BytesFromStackItem(params[2])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get session token: %w", err)
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
// DeleteSuccess structures notification event of successful container removal
|
||||
// thrown by Container contract.
|
||||
type DeleteSuccess struct {
|
||||
|
|
|
@ -10,66 +10,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseDelete(t *testing.T) {
|
||||
var (
|
||||
containerID = []byte("containreID")
|
||||
signature = []byte("signature")
|
||||
token = []byte("token")
|
||||
)
|
||||
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}
|
||||
|
||||
_, err := ParseDelete(createNotifyEventFromItems(prms))
|
||||
require.EqualError(t, err, event.WrongNumberOfParameters(3, len(prms)).Error())
|
||||
})
|
||||
|
||||
t.Run("wrong container parameter", func(t *testing.T) {
|
||||
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong signature parameter", func(t *testing.T) {
|
||||
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerID),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong session token parameter", func(t *testing.T) {
|
||||
_, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerID),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct behavior", func(t *testing.T) {
|
||||
ev, err := ParseDelete(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerID),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewByteArray(token),
|
||||
}))
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, Delete{
|
||||
ContainerIDValue: containerID,
|
||||
SignatureValue: signature,
|
||||
TokenValue: token,
|
||||
}, ev)
|
||||
})
|
||||
}
|
||||
|
||||
func TestParseDeleteSuccess(t *testing.T) {
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
|
|
|
@ -1,11 +1,6 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||
)
|
||||
|
||||
|
@ -54,48 +49,3 @@ func (x SetEACL) NotaryRequest() *payload.P2PNotaryRequest {
|
|||
}
|
||||
|
||||
const expectedItemNumEACL = 4
|
||||
|
||||
// ParseSetEACL parses SetEACL notification event from list of stack items.
|
||||
//
|
||||
// Expects 4 stack items.
|
||||
func ParseSetEACL(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||
var (
|
||||
ev SetEACL
|
||||
err error
|
||||
)
|
||||
|
||||
params, err := event.ParseStackArray(e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
||||
}
|
||||
|
||||
if ln := len(params); ln != expectedItemNumEACL {
|
||||
return nil, event.WrongNumberOfParameters(expectedItemNumEACL, ln)
|
||||
}
|
||||
|
||||
// parse table
|
||||
ev.TableValue, err = client.BytesFromStackItem(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse binary table: %w", err)
|
||||
}
|
||||
|
||||
// parse signature
|
||||
ev.SignatureValue, err = client.BytesFromStackItem(params[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse table signature: %w", err)
|
||||
}
|
||||
|
||||
// parse public key
|
||||
ev.PublicKeyValue, err = client.BytesFromStackItem(params[2])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse binary public key: %w", err)
|
||||
}
|
||||
|
||||
// parse session token
|
||||
ev.TokenValue, err = client.BytesFromStackItem(params[3])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get session token: %w", err)
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
|
|
@ -1,90 +1,10 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseEACL(t *testing.T) {
|
||||
var (
|
||||
binaryTable = []byte("table")
|
||||
signature = []byte("signature")
|
||||
publicKey = []byte("pubkey")
|
||||
token = []byte("token")
|
||||
)
|
||||
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
items := []stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
stackitem.NewMap(),
|
||||
}
|
||||
|
||||
_, err := ParseSetEACL(createNotifyEventFromItems(items))
|
||||
require.EqualError(t, err, event.WrongNumberOfParameters(4, len(items)).Error())
|
||||
})
|
||||
|
||||
t.Run("wrong container parameter", func(t *testing.T) {
|
||||
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
stackitem.NewMap(),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong signature parameter", func(t *testing.T) {
|
||||
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(binaryTable),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong key parameter", func(t *testing.T) {
|
||||
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(binaryTable),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong session token parameter", func(t *testing.T) {
|
||||
_, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(binaryTable),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewByteArray(publicKey),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct behavior", func(t *testing.T) {
|
||||
ev, err := ParseSetEACL(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(binaryTable),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewByteArray(publicKey),
|
||||
stackitem.NewByteArray(token),
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
|
||||
e := ev.(SetEACL)
|
||||
|
||||
require.Equal(t, binaryTable, e.Table())
|
||||
require.Equal(t, signature, e.Signature())
|
||||
require.Equal(t, publicKey, e.PublicKey())
|
||||
require.Equal(t, token, e.SessionToken())
|
||||
})
|
||||
}
|
||||
|
||||
func createNotifyEventFromItems(items []stackitem.Item) *state.ContainedNotificationEvent {
|
||||
return &state.ContainedNotificationEvent{
|
||||
NotificationEvent: state.NotificationEvent{
|
||||
|
|
|
@ -65,49 +65,6 @@ func (x PutNamed) Zone() string {
|
|||
return x.zone
|
||||
}
|
||||
|
||||
// ParsePut from notification into container event structure.
|
||||
func ParsePut(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||
var (
|
||||
ev Put
|
||||
err error
|
||||
)
|
||||
|
||||
params, err := event.ParseStackArray(e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
||||
}
|
||||
|
||||
if ln := len(params); ln != expectedItemNumPut {
|
||||
return nil, event.WrongNumberOfParameters(expectedItemNumPut, ln)
|
||||
}
|
||||
|
||||
// parse container
|
||||
ev.rawContainer, err = client.BytesFromStackItem(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// parse signature
|
||||
ev.signature, err = client.BytesFromStackItem(params[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get signature: %w", err)
|
||||
}
|
||||
|
||||
// parse public key
|
||||
ev.publicKey, err = client.BytesFromStackItem(params[2])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get public key: %w", err)
|
||||
}
|
||||
|
||||
// parse session token
|
||||
ev.token, err = client.BytesFromStackItem(params[3])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get sesison token: %w", err)
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
// PutSuccess structures notification event of successful container creation
|
||||
// thrown by Container contract.
|
||||
type PutSuccess struct {
|
||||
|
|
|
@ -10,80 +10,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParsePut(t *testing.T) {
|
||||
var (
|
||||
containerData = []byte("containerData")
|
||||
signature = []byte("signature")
|
||||
publicKey = []byte("pubkey")
|
||||
token = []byte("token")
|
||||
)
|
||||
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
stackitem.NewMap(),
|
||||
}
|
||||
|
||||
_, err := ParsePut(createNotifyEventFromItems(prms))
|
||||
require.EqualError(t, err, event.WrongNumberOfParameters(expectedItemNumPut, len(prms)).Error())
|
||||
})
|
||||
|
||||
t.Run("wrong container parameter", func(t *testing.T) {
|
||||
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong signature parameter", func(t *testing.T) {
|
||||
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerData),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong key parameter", func(t *testing.T) {
|
||||
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerData),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong session token parameter", func(t *testing.T) {
|
||||
_, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerData),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewByteArray(publicKey),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct behavior", func(t *testing.T) {
|
||||
ev, err := ParsePut(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(containerData),
|
||||
stackitem.NewByteArray(signature),
|
||||
stackitem.NewByteArray(publicKey),
|
||||
stackitem.NewByteArray(token),
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, Put{
|
||||
rawContainer: containerData,
|
||||
signature: signature,
|
||||
publicKey: publicKey,
|
||||
token: token,
|
||||
}, ev)
|
||||
})
|
||||
}
|
||||
|
||||
func TestParsePutSuccess(t *testing.T) {
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
|
|
|
@ -1,11 +1,6 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||
)
|
||||
|
||||
|
@ -31,26 +26,3 @@ func (s AddPeer) NotaryRequest() *payload.P2PNotaryRequest {
|
|||
}
|
||||
|
||||
const expectedItemNumAddPeer = 1
|
||||
|
||||
func ParseAddPeer(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||
var (
|
||||
ev AddPeer
|
||||
err error
|
||||
)
|
||||
|
||||
params, err := event.ParseStackArray(e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
||||
}
|
||||
|
||||
if ln := len(params); ln != expectedItemNumAddPeer {
|
||||
return nil, event.WrongNumberOfParameters(expectedItemNumAddPeer, ln)
|
||||
}
|
||||
|
||||
ev.NodeBytes, err = client.BytesFromStackItem(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get raw nodeinfo: %w", err)
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
|
|
@ -1,47 +1,10 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseAddPeer(t *testing.T) {
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
stackitem.NewMap(),
|
||||
}
|
||||
|
||||
_, err := ParseAddPeer(createNotifyEventFromItems(prms))
|
||||
require.EqualError(t, err, event.WrongNumberOfParameters(1, len(prms)).Error())
|
||||
})
|
||||
|
||||
t.Run("wrong first parameter type", func(t *testing.T) {
|
||||
_, err := ParseAddPeer(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct behavior", func(t *testing.T) {
|
||||
info := []byte{1, 2, 3}
|
||||
|
||||
ev, err := ParseAddPeer(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(info),
|
||||
}))
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, AddPeer{
|
||||
NodeBytes: info,
|
||||
}, ev)
|
||||
})
|
||||
}
|
||||
|
||||
func createNotifyEventFromItems(items []stackitem.Item) *state.ContainedNotificationEvent {
|
||||
return &state.ContainedNotificationEvent{
|
||||
NotificationEvent: state.NotificationEvent{
|
||||
|
|
|
@ -1,13 +1,9 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"crypto/elliptic"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||
)
|
||||
|
@ -60,43 +56,3 @@ func (s *UpdatePeer) decodeState(state int64) error {
|
|||
}
|
||||
|
||||
const expectedItemNumUpdatePeer = 2
|
||||
|
||||
func ParseUpdatePeer(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||
var (
|
||||
ev UpdatePeer
|
||||
err error
|
||||
)
|
||||
|
||||
params, err := event.ParseStackArray(e)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
||||
}
|
||||
|
||||
if ln := len(params); ln != expectedItemNumUpdatePeer {
|
||||
return nil, event.WrongNumberOfParameters(expectedItemNumUpdatePeer, ln)
|
||||
}
|
||||
|
||||
// parse public key
|
||||
key, err := client.BytesFromStackItem(params[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get public key: %w", err)
|
||||
}
|
||||
|
||||
ev.PubKey, err = keys.NewPublicKeyFromBytes(key, elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse public key: %w", err)
|
||||
}
|
||||
|
||||
// parse node status
|
||||
st, err := client.IntFromStackItem(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get node status: %w", err)
|
||||
}
|
||||
|
||||
err = ev.decodeState(st)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
package netmap
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseUpdatePeer(t *testing.T) {
|
||||
priv, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
publicKey := priv.PublicKey()
|
||||
|
||||
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||
prms := []stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}
|
||||
|
||||
_, err := ParseUpdatePeer(createNotifyEventFromItems(prms))
|
||||
require.EqualError(t, err, event.WrongNumberOfParameters(2, len(prms)).Error())
|
||||
})
|
||||
|
||||
t.Run("wrong first parameter type", func(t *testing.T) {
|
||||
_, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("wrong second parameter type", func(t *testing.T) {
|
||||
_, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewByteArray(publicKey.Bytes()),
|
||||
stackitem.NewMap(),
|
||||
}))
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("correct behavior", func(t *testing.T) {
|
||||
const state = netmap.NodeStateMaintenance
|
||||
ev, err := ParseUpdatePeer(createNotifyEventFromItems([]stackitem.Item{
|
||||
stackitem.NewBigInteger(big.NewInt(int64(state))),
|
||||
stackitem.NewByteArray(publicKey.Bytes()),
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, UpdatePeer{
|
||||
PubKey: publicKey,
|
||||
State: state,
|
||||
}, ev)
|
||||
})
|
||||
}
|
|
@ -185,15 +185,15 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
|
|||
}
|
||||
invokerWitness := ln == 4
|
||||
|
||||
multiInvScript := nr.MainTransaction.Scripts[1].InvocationScript
|
||||
|
||||
// alphabet node should handle only notary requests
|
||||
// that have been sent unsigned (by storage nodes) =>
|
||||
// such main TXs should have either a dummy or an
|
||||
// empty script as an invocation script
|
||||
// alphabet node should handle only notary requests that do not yet have inner
|
||||
// ring multisignature filled => such main TXs either have empty invocation script
|
||||
// of the inner ring witness (in case if Notary Actor is used to create request)
|
||||
// or have it filled with dummy bytes (if request was created manually with the old
|
||||
// neo-go API)
|
||||
//
|
||||
// this check prevents notary flow recursion
|
||||
if len(multiInvScript) > 0 && !bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript) {
|
||||
if !(len(nr.MainTransaction.Scripts[1].InvocationScript) == 0 ||
|
||||
bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript)) { // compatibility with old version
|
||||
return ErrTXAlreadyHandled
|
||||
}
|
||||
|
||||
|
@ -220,12 +220,7 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
|
|||
}
|
||||
|
||||
// validate main TX expiration
|
||||
err = p.validateExpiration(nr.FallbackTransaction)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return p.validateExpiration(nr.FallbackTransaction)
|
||||
}
|
||||
|
||||
func (p Preparator) validateParameterOpcodes(ops []Op) error {
|
||||
|
@ -363,7 +358,9 @@ func (p Preparator) validateWitnesses(w []transaction.Witness, alphaKeys keys.Pu
|
|||
|
||||
// the last one must be a placeholder for notary contract witness
|
||||
last := len(w) - 1
|
||||
if !bytes.Equal(w[last].InvocationScript, p.dummyInvocationScript) || len(w[last].VerificationScript) != 0 {
|
||||
if !(len(w[last].InvocationScript) == 0 || // https://github.com/nspcc-dev/neo-go/pull/2981
|
||||
bytes.Equal(w[last].InvocationScript, p.dummyInvocationScript)) || // compatibility with old version
|
||||
len(w[last].VerificationScript) != 0 {
|
||||
return errIncorrectNotaryPlaceholder
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||
|
@ -24,8 +25,9 @@ var (
|
|||
alphaKeys keys.PublicKeys
|
||||
wrongAlphaKeys keys.PublicKeys
|
||||
|
||||
dummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...)
|
||||
wrongDummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64, 1}, make([]byte, 63)...)
|
||||
dummyAlphabetInvocationScript = []byte{} // expected to be empty if generated by Notary Actor, as requester can't fill it in
|
||||
dummyAlphabetInvocationScriptOld = append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...) // expected to be dummy if generated manually
|
||||
wrongDummyInvocationScript = append([]byte{byte(opcode.PUSHDATA1), 64, 1}, make([]byte, 63)...)
|
||||
|
||||
scriptHash util.Uint160
|
||||
)
|
||||
|
@ -61,35 +63,37 @@ func TestPrepare_IncorrectScript(t *testing.T) {
|
|||
},
|
||||
)
|
||||
|
||||
t.Run("not contract call", func(t *testing.T) {
|
||||
bw := io.NewBufBinWriter()
|
||||
for _, dummyMultisig := range []bool{true, false} { // try both empty and dummy multisig/Notary invocation witness script
|
||||
t.Run(fmt.Sprintf("not contract call, compat: %t", dummyMultisig), func(t *testing.T) {
|
||||
bw := io.NewBufBinWriter()
|
||||
|
||||
emit.Int(bw.BinWriter, 4)
|
||||
emit.String(bw.BinWriter, "test")
|
||||
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
|
||||
emit.Syscall(bw.BinWriter, interopnames.SystemContractCallNative) // any != interopnames.SystemContractCall
|
||||
emit.Int(bw.BinWriter, 4)
|
||||
emit.String(bw.BinWriter, "test")
|
||||
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
|
||||
emit.Syscall(bw.BinWriter, interopnames.SystemContractCallNative) // any != interopnames.SystemContractCall
|
||||
|
||||
nr := correctNR(bw.Bytes(), false)
|
||||
nr := correctNR(bw.Bytes(), dummyMultisig, false)
|
||||
|
||||
_, err := preparator.Prepare(nr)
|
||||
_, err := preparator.Prepare(nr)
|
||||
|
||||
require.EqualError(t, err, errNotContractCall.Error())
|
||||
})
|
||||
require.EqualError(t, err, errNotContractCall.Error())
|
||||
})
|
||||
|
||||
t.Run("incorrect ", func(t *testing.T) {
|
||||
bw := io.NewBufBinWriter()
|
||||
t.Run(fmt.Sprintf("incorrect, compat: %t", dummyMultisig), func(t *testing.T) {
|
||||
bw := io.NewBufBinWriter()
|
||||
|
||||
emit.Int(bw.BinWriter, -1)
|
||||
emit.String(bw.BinWriter, "test")
|
||||
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
|
||||
emit.Syscall(bw.BinWriter, interopnames.SystemContractCall)
|
||||
emit.Int(bw.BinWriter, -1)
|
||||
emit.String(bw.BinWriter, "test")
|
||||
emit.Bytes(bw.BinWriter, scriptHash.BytesBE())
|
||||
emit.Syscall(bw.BinWriter, interopnames.SystemContractCall)
|
||||
|
||||
nr := correctNR(bw.Bytes(), false)
|
||||
nr := correctNR(bw.Bytes(), dummyMultisig, false)
|
||||
|
||||
_, err := preparator.Prepare(nr)
|
||||
_, err := preparator.Prepare(nr)
|
||||
|
||||
require.EqualError(t, err, errIncorrectCallFlag.Error())
|
||||
})
|
||||
require.EqualError(t, err, errIncorrectCallFlag.Error())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepare_IncorrectNR(t *testing.T) {
|
||||
|
@ -209,7 +213,23 @@ func TestPrepare_IncorrectNR(t *testing.T) {
|
|||
InvocationScript: make([]byte, 1),
|
||||
},
|
||||
{
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScript,
|
||||
},
|
||||
{},
|
||||
},
|
||||
},
|
||||
expErr: errIncorrectProxyWitnesses,
|
||||
},
|
||||
{
|
||||
name: "incorrect main TX proxy witness compat",
|
||||
addW: false,
|
||||
mTX: mTX{
|
||||
scripts: []transaction.Witness{
|
||||
{
|
||||
InvocationScript: make([]byte, 1),
|
||||
},
|
||||
{
|
||||
InvocationScript: dummyAlphabetInvocationScriptOld,
|
||||
},
|
||||
{},
|
||||
},
|
||||
|
@ -224,7 +244,22 @@ func TestPrepare_IncorrectNR(t *testing.T) {
|
|||
{},
|
||||
{
|
||||
VerificationScript: wrongAlphaVerificationScript,
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScript,
|
||||
},
|
||||
{},
|
||||
},
|
||||
},
|
||||
expErr: errIncorrectAlphabet,
|
||||
},
|
||||
{
|
||||
name: "incorrect main TX Alphabet witness compat",
|
||||
addW: false,
|
||||
mTX: mTX{
|
||||
scripts: []transaction.Witness{
|
||||
{},
|
||||
{
|
||||
VerificationScript: wrongAlphaVerificationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScriptOld,
|
||||
},
|
||||
{},
|
||||
},
|
||||
|
@ -239,7 +274,24 @@ func TestPrepare_IncorrectNR(t *testing.T) {
|
|||
{},
|
||||
{
|
||||
VerificationScript: alphaVerificationScript,
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScript,
|
||||
},
|
||||
{
|
||||
InvocationScript: wrongDummyInvocationScript,
|
||||
},
|
||||
},
|
||||
},
|
||||
expErr: errIncorrectNotaryPlaceholder,
|
||||
},
|
||||
{
|
||||
name: "incorrect main TX Notary witness compat",
|
||||
addW: false,
|
||||
mTX: mTX{
|
||||
scripts: []transaction.Witness{
|
||||
{},
|
||||
{
|
||||
VerificationScript: alphaVerificationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScriptOld,
|
||||
},
|
||||
{
|
||||
InvocationScript: wrongDummyInvocationScript,
|
||||
|
@ -289,7 +341,23 @@ func TestPrepare_IncorrectNR(t *testing.T) {
|
|||
{},
|
||||
{
|
||||
VerificationScript: alphaVerificationScript,
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScript,
|
||||
},
|
||||
{},
|
||||
{},
|
||||
},
|
||||
},
|
||||
expErr: errIncorrectInvokerWitnesses,
|
||||
},
|
||||
{
|
||||
name: "incorrect invoker TX Alphabet witness compat",
|
||||
addW: true,
|
||||
mTX: mTX{
|
||||
scripts: []transaction.Witness{
|
||||
{},
|
||||
{
|
||||
VerificationScript: alphaVerificationScript,
|
||||
InvocationScript: dummyAlphabetInvocationScriptOld,
|
||||
},
|
||||
{},
|
||||
{},
|
||||
|
@ -327,7 +395,7 @@ func TestPrepare_IncorrectNR(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
correctNR := correctNR(nil, test.addW)
|
||||
correctNR := correctNR(nil, false, test.addW)
|
||||
incorrectNR = setIncorrectFields(*correctNR, test.mTX, test.fbTX)
|
||||
|
||||
_, err = preparator.Prepare(&incorrectNR)
|
||||
|
@ -372,40 +440,42 @@ func TestPrepare_CorrectNR(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
for i := 0; i < 1; i++ { // run tests against 3 and 4 witness NR
|
||||
additionalWitness := i == 0
|
||||
nr := correctNR(script(test.hash, test.method, test.args...), additionalWitness)
|
||||
for _, dummyMultisig := range []bool{true, false} { // run tests against empty and dummy multisig/Notary witness
|
||||
additionalWitness := i == 0
|
||||
nr := correctNR(script(test.hash, test.method, test.args...), dummyMultisig, additionalWitness)
|
||||
|
||||
event, err := preparator.Prepare(nr)
|
||||
event, err := preparator.Prepare(nr)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, test.method, event.Type().String())
|
||||
require.Equal(t, test.hash.StringLE(), event.ScriptHash().StringLE())
|
||||
|
||||
// check args parsing
|
||||
bw := io.NewBufBinWriter()
|
||||
emit.Array(bw.BinWriter, test.args...)
|
||||
|
||||
ctx := vm.NewContext(bw.Bytes())
|
||||
|
||||
opCode, param, err := ctx.Next()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, opGot := range event.Params() {
|
||||
require.Equal(t, opCode, opGot.code)
|
||||
require.Equal(t, param, opGot.param)
|
||||
|
||||
opCode, param, err = ctx.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, test.method, event.Type().String())
|
||||
require.Equal(t, test.hash.StringLE(), event.ScriptHash().StringLE())
|
||||
|
||||
// check args parsing
|
||||
bw := io.NewBufBinWriter()
|
||||
emit.Array(bw.BinWriter, test.args...)
|
||||
|
||||
ctx := vm.NewContext(bw.Bytes())
|
||||
|
||||
opCode, param, err := ctx.Next()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, opGot := range event.Params() {
|
||||
require.Equal(t, opCode, opGot.code)
|
||||
require.Equal(t, param, opGot.param)
|
||||
|
||||
opCode, param, err = ctx.Next()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, _, err = ctx.Next() // PACK opcode
|
||||
require.NoError(t, err)
|
||||
_, _, err = ctx.Next() // packing len opcode
|
||||
require.NoError(t, err)
|
||||
|
||||
opCode, _, err = ctx.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, opcode.RET, opCode)
|
||||
}
|
||||
|
||||
_, _, err = ctx.Next() // PACK opcode
|
||||
require.NoError(t, err)
|
||||
_, _, err = ctx.Next() // packing len opcode
|
||||
require.NoError(t, err)
|
||||
|
||||
opCode, _, err = ctx.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, opcode.RET, opCode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -428,7 +498,7 @@ func script(hash util.Uint160, method string, args ...any) []byte {
|
|||
return bw.Bytes()
|
||||
}
|
||||
|
||||
func correctNR(script []byte, additionalWitness bool) *payload.P2PNotaryRequest {
|
||||
func correctNR(script []byte, dummyMultisig, additionalWitness bool) *payload.P2PNotaryRequest {
|
||||
alphaVerificationScript, _ := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
|
||||
|
||||
signers := []transaction.Signer{
|
||||
|
@ -443,20 +513,24 @@ func correctNR(script []byte, additionalWitness bool) *payload.P2PNotaryRequest
|
|||
signers[2] = transaction.Signer{Account: hash.Hash160(alphaVerificationScript)}
|
||||
}
|
||||
|
||||
multisigInv := dummyAlphabetInvocationScript
|
||||
if dummyMultisig {
|
||||
multisigInv = dummyAlphabetInvocationScriptOld
|
||||
}
|
||||
scripts := []transaction.Witness{
|
||||
{},
|
||||
{
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: multisigInv,
|
||||
VerificationScript: alphaVerificationScript,
|
||||
},
|
||||
{
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: multisigInv,
|
||||
},
|
||||
}
|
||||
if additionalWitness { // insert on element with index 2
|
||||
scripts = append(scripts[:2+1], scripts[2:]...)
|
||||
scripts[2] = transaction.Witness{
|
||||
InvocationScript: dummyInvocationScript,
|
||||
InvocationScript: multisigInv,
|
||||
VerificationScript: alphaVerificationScript,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -27,7 +27,6 @@ type (
|
|||
// Subscriber is an interface of the NotificationEvent listener.
|
||||
Subscriber interface {
|
||||
SubscribeForNotification(...util.Uint160) error
|
||||
UnsubscribeForNotification()
|
||||
BlockNotifications() error
|
||||
SubscribeForNotaryRequests(mainTXSigner util.Uint160) error
|
||||
|
||||
|
@ -36,16 +35,27 @@ type (
|
|||
Close()
|
||||
}
|
||||
|
||||
subChannels struct {
|
||||
NotifyChan chan *state.ContainedNotificationEvent
|
||||
BlockChan chan *block.Block
|
||||
NotaryChan chan *result.NotaryRequestEvent
|
||||
}
|
||||
|
||||
subscriber struct {
|
||||
*sync.RWMutex
|
||||
log *logger.Logger
|
||||
client *client.Client
|
||||
|
||||
notifyChan chan *state.ContainedNotificationEvent
|
||||
|
||||
blockChan chan *block.Block
|
||||
|
||||
blockChan chan *block.Block
|
||||
notaryChan chan *result.NotaryRequestEvent
|
||||
|
||||
current subChannels
|
||||
|
||||
// cached subscription information
|
||||
subscribedEvents map[util.Uint160]bool
|
||||
subscribedNotaryEvents map[util.Uint160]bool
|
||||
subscribedToNewBlocks bool
|
||||
}
|
||||
|
||||
// Params is a group of Subscriber constructor parameters.
|
||||
|
@ -76,116 +86,66 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
notifyIDs := make(map[util.Uint160]struct{}, len(contracts))
|
||||
notifyIDs := make([]string, 0, len(contracts))
|
||||
|
||||
for i := range contracts {
|
||||
if s.subscribedEvents[contracts[i]] {
|
||||
continue
|
||||
}
|
||||
// subscribe to contract notifications
|
||||
err := s.client.SubscribeForExecutionNotifications(contracts[i])
|
||||
id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
|
||||
if err != nil {
|
||||
// if there is some error, undo all subscriptions and return error
|
||||
for hash := range notifyIDs {
|
||||
_ = s.client.UnsubscribeContract(hash)
|
||||
for _, id := range notifyIDs {
|
||||
_ = s.client.Unsubscribe(id)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// save notification id
|
||||
notifyIDs[contracts[i]] = struct{}{}
|
||||
notifyIDs = append(notifyIDs, id)
|
||||
}
|
||||
for i := range contracts {
|
||||
s.subscribedEvents[contracts[i]] = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) UnsubscribeForNotification() {
|
||||
err := s.client.UnsubscribeAll()
|
||||
if err != nil {
|
||||
s.log.Error(logs.SubscriberUnsubscribeForNotification,
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) Close() {
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func (s *subscriber) BlockNotifications() error {
|
||||
if err := s.client.SubscribeForNewBlocks(); err != nil {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.subscribedToNewBlocks {
|
||||
return nil
|
||||
}
|
||||
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
|
||||
return fmt.Errorf("could not subscribe for new block events: %w", err)
|
||||
}
|
||||
|
||||
s.subscribedToNewBlocks = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
|
||||
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.subscribedNotaryEvents[mainTXSigner] {
|
||||
return nil
|
||||
}
|
||||
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
|
||||
return fmt.Errorf("could not subscribe for notary request events: %w", err)
|
||||
}
|
||||
|
||||
s.subscribedNotaryEvents[mainTXSigner] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) routeNotifications(ctx context.Context) {
|
||||
notificationChan := s.client.NotificationChannel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case notification, ok := <-notificationChan:
|
||||
if !ok {
|
||||
s.log.Warn(logs.SubscriberRemoteNotificationChannelHasBeenClosed)
|
||||
close(s.notifyChan)
|
||||
close(s.blockChan)
|
||||
close(s.notaryChan)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
switch notification.Type {
|
||||
case neorpc.NotificationEventID:
|
||||
notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent)
|
||||
if !ok {
|
||||
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotifyStruct,
|
||||
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
s.log.Debug(logs.SubscriberNewNotificationEventFromSidechain,
|
||||
zap.String("name", notifyEvent.Name),
|
||||
)
|
||||
|
||||
s.notifyChan <- notifyEvent
|
||||
case neorpc.BlockEventID:
|
||||
b, ok := notification.Value.(*block.Block)
|
||||
if !ok {
|
||||
s.log.Error(logs.SubscriberCantCastBlockEventValueToBlock,
|
||||
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
s.blockChan <- b
|
||||
case neorpc.NotaryRequestEventID:
|
||||
notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent)
|
||||
if !ok {
|
||||
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct,
|
||||
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
s.notaryChan <- notaryRequest
|
||||
default:
|
||||
s.log.Debug(logs.SubscriberUnsupportedNotificationFromTheChain,
|
||||
zap.Uint8("type", uint8(notification.Type)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// New is a constructs Neo:Morph event listener and returns Subscriber interface.
|
||||
func New(ctx context.Context, p *Params) (Subscriber, error) {
|
||||
switch {
|
||||
|
@ -209,16 +169,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
|
|||
notifyChan: make(chan *state.ContainedNotificationEvent),
|
||||
blockChan: make(chan *block.Block),
|
||||
notaryChan: make(chan *result.NotaryRequestEvent),
|
||||
}
|
||||
|
||||
// Worker listens all events from neo-go websocket and puts them
|
||||
// into corresponding channel. It may be notifications, transactions,
|
||||
// new blocks. For now only notifications.
|
||||
current: newSubChannels(),
|
||||
|
||||
subscribedEvents: make(map[util.Uint160]bool),
|
||||
subscribedNotaryEvents: make(map[util.Uint160]bool),
|
||||
}
|
||||
// Worker listens all events from temporary NeoGo channel and puts them
|
||||
// into corresponding permanent channels.
|
||||
go sub.routeNotifications(ctx)
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) routeNotifications(ctx context.Context) {
|
||||
var (
|
||||
// TODO: not needed after nspcc-dev/neo-go#2980.
|
||||
cliCh = s.client.NotificationChannel()
|
||||
restoreCh = make(chan bool)
|
||||
restoreInProgress bool
|
||||
)
|
||||
|
||||
routeloop:
|
||||
for {
|
||||
var connLost bool
|
||||
s.RLock()
|
||||
curr := s.current
|
||||
s.RUnlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break routeloop
|
||||
case ev, ok := <-curr.NotifyChan:
|
||||
if ok {
|
||||
s.notifyChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.BlockChan:
|
||||
if ok {
|
||||
s.blockChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.NotaryChan:
|
||||
if ok {
|
||||
s.notaryChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
}
|
||||
case _, ok := <-cliCh:
|
||||
connLost = !ok
|
||||
case ok := <-restoreCh:
|
||||
restoreInProgress = false
|
||||
if !ok {
|
||||
connLost = true
|
||||
}
|
||||
}
|
||||
if connLost {
|
||||
if !restoreInProgress {
|
||||
restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh)
|
||||
if !restoreInProgress {
|
||||
break routeloop
|
||||
}
|
||||
curr.drain()
|
||||
} else { // Avoid getting additional !ok events.
|
||||
s.Lock()
|
||||
s.current.NotifyChan = nil
|
||||
s.current.BlockChan = nil
|
||||
s.current.NotaryChan = nil
|
||||
s.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
close(s.notifyChan)
|
||||
close(s.blockChan)
|
||||
close(s.notaryChan)
|
||||
}
|
||||
|
||||
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) {
|
||||
s.log.Info("RPC connection lost, attempting reconnect")
|
||||
if !s.client.SwitchRPC(ctx) {
|
||||
s.log.Error("can't switch RPC node")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
cliCh := s.client.NotificationChannel()
|
||||
|
||||
s.Lock()
|
||||
chs := newSubChannels()
|
||||
go func() {
|
||||
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
|
||||
}()
|
||||
s.current = chs
|
||||
s.Unlock()
|
||||
|
||||
return true, cliCh
|
||||
}
|
||||
|
||||
func newSubChannels() subChannels {
|
||||
return subChannels{
|
||||
NotifyChan: make(chan *state.ContainedNotificationEvent),
|
||||
BlockChan: make(chan *block.Block),
|
||||
NotaryChan: make(chan *result.NotaryRequestEvent),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subChannels) drain() {
|
||||
drainloop:
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-s.NotifyChan:
|
||||
if !ok {
|
||||
s.NotifyChan = nil
|
||||
}
|
||||
case _, ok := <-s.BlockChan:
|
||||
if !ok {
|
||||
s.BlockChan = nil
|
||||
}
|
||||
case _, ok := <-s.NotaryChan:
|
||||
if !ok {
|
||||
s.NotaryChan = nil
|
||||
}
|
||||
default:
|
||||
break drainloop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// restoreSubscriptions restores subscriptions according to
|
||||
// cached information about them.
|
||||
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
|
||||
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool {
|
||||
var err error
|
||||
|
||||
// new block events restoration
|
||||
if s.subscribedToNewBlocks {
|
||||
_, err = s.client.ReceiveBlocks(blCh)
|
||||
if err != nil {
|
||||
s.log.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// notification events restoration
|
||||
for contract := range s.subscribedEvents {
|
||||
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
|
||||
if err != nil {
|
||||
s.log.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// notary notification events restoration
|
||||
for signer := range s.subscribedNotaryEvents {
|
||||
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
|
||||
if err != nil {
|
||||
s.log.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// awaitHeight checks if remote client has least expected block height and
|
||||
// returns error if it is not reached that height after timeout duration.
|
||||
// This function is required to avoid connections to unsynced RPC nodes, because
|
||||
|
|
Loading…
Add table
Reference in a new issue
Why not locked with switchLock?
Fixed