frostfs-node/pkg/morph/client/client.go
Evgenii Stratonikov 4b13b85173 [#1000] morph: Fix batch size in TraverseIterator()
Initial prefetch size can be arbitrary an restricted only by VM/RPC
limits. For TraverseIterator() there is an explicit check on the
server-side, though.
Introduced in df055fead5.
Refs #931.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-02-23 06:24:40 +00:00

581 lines
14 KiB
Go

package client
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
// Client is a wrapper over web socket neo-go client
// that provides smart-contract invocation interface
// and notification subscription functionality.
//
// On connection lost tries establishing new connection
// to the next RPC (if any). If no RPC node available,
// switches to inactive mode: any RPC call leads to immediate
// return with ErrConnectionLost error, notification channel
// returned from Client.NotificationChannel is closed.
//
// Working client must be created via constructor New.
// Using the Client that has been created with new(Client)
// expression (or just declaring a Client variable) is unsafe
// and can lead to panic.
type Client struct {
cache cache
logger *logger.Logger // logging component
metrics morphmetrics.Register
client *rpcclient.WSClient // neo-go websocket client
rpcActor *actor.Actor // neo-go RPC actor
gasToken *nep17.Token // neo-go GAS token wrapper
rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper
acc *wallet.Account // neo account
accAddr util.Uint160 // account's address
notary *notaryInfo
cfg cfg
endpoints endpoints
// switchLock protects endpoints, inactive, and subscription-related fields.
// It is taken exclusively during endpoint switch and locked in shared mode
// on every normal call.
switchLock sync.RWMutex
// channel for internal stop
closeChan chan struct{}
closed atomic.Bool
wg sync.WaitGroup
// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
inactive bool
// indicates that Client has already started
// goroutine that tries to switch to the higher
// priority RPC node
switchIsActive atomic.Bool
}
type cache struct {
m sync.RWMutex
nnsHash *util.Uint160
gKey *keys.PublicKey
txHeights *lru.Cache[util.Uint256, uint32]
metrics metrics.MorphCacheMetrics
}
func (c *cache) nns() *util.Uint160 {
c.m.RLock()
defer c.m.RUnlock()
return c.nnsHash
}
func (c *cache) setNNSHash(nnsHash util.Uint160) {
c.m.Lock()
defer c.m.Unlock()
c.nnsHash = &nnsHash
}
func (c *cache) groupKey() *keys.PublicKey {
c.m.RLock()
defer c.m.RUnlock()
return c.gKey
}
func (c *cache) setGroupKey(groupKey *keys.PublicKey) {
c.m.Lock()
defer c.m.Unlock()
c.gKey = groupKey
}
func (c *cache) invalidate() {
c.m.Lock()
defer c.m.Unlock()
c.nnsHash = nil
c.gKey = nil
c.txHeights.Purge()
}
var (
// ErrNilClient is returned by functions that expect
// a non-nil Client pointer, but received nil.
ErrNilClient = errors.New("client is nil")
// ErrConnectionLost is returned when client lost web socket connection
// to the RPC node and has not been able to establish a new one since.
ErrConnectionLost = errors.New("connection to the RPC node has been lost")
)
// HaltState returned if TestInvoke function processed without panic.
const HaltState = "HALT"
type notHaltStateError struct {
state, exception string
}
func (e *notHaltStateError) Error() string {
return fmt.Sprintf(
"chain/client: contract execution finished with state %s; exception: %s",
e.state,
e.exception,
)
}
// implementation of error interface for FrostFS-specific errors.
type frostfsError struct {
err error
}
func (e frostfsError) Error() string {
return fmt.Sprintf("frostfs error: %v", e.err)
}
// wraps FrostFS-specific error into frostfsError. Arg must not be nil.
func wrapFrostFSError(err error) error {
return frostfsError{err}
}
// Invoke invokes contract method by sending transaction into blockchain.
// Returns valid until block value.
// Supported args types: int64, string, util.Uint160, []byte and bool.
func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) (uint32, error) {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("Invoke", contract.String(), method, success, time.Since(start))
}()
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...)
if err != nil {
return 0, fmt.Errorf("could not invoke %s: %w", method, err)
}
c.logger.Debug(logs.ClientNeoClientInvoke,
zap.String("method", method),
zap.Uint32("vub", vub),
zap.Stringer("tx_hash", txHash.Reverse()))
success = true
return vub, nil
}
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
// If cb returns an error, the session is closed and this error is returned as-is.
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
// The default batchSize is 100, the default limit from neo-go.
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("TestInvokeIterator", contract.String(), method, success, time.Since(start))
}()
if batchSize <= 0 {
batchSize = invoker.DefaultIteratorResultItems
}
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
script, err := smartcontract.CreateCallAndPrefetchIteratorScript(contract, method, batchSize, args...)
if err != nil {
return err
}
val, err := c.rpcActor.Run(script)
if err != nil {
return err
} else if val.State != HaltState {
return wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
}
arr, sid, r, err := unwrap.ArrayAndSessionIterator(val, err)
if err != nil {
return err
}
for i := range arr {
if err := cb(arr[i]); err != nil {
return err
}
}
if (sid == uuid.UUID{}) {
success = true
return nil
}
defer func() {
_ = c.rpcActor.TerminateSession(sid)
}()
// Batch size for TraverseIterator() can restricted on the server-side.
traverseBatchSize := batchSize
if invoker.DefaultIteratorResultItems < traverseBatchSize {
traverseBatchSize = invoker.DefaultIteratorResultItems
}
for {
items, err := c.rpcActor.TraverseIterator(sid, &r, traverseBatchSize)
if err != nil {
return err
}
for i := range items {
if err := cb(items[i]); err != nil {
return err
}
}
if len(items) < traverseBatchSize {
break
}
}
success = true
return nil
}
// TestInvoke invokes contract method locally in neo-go node. This method should
// be used to read data from smart-contract.
func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) (res []stackitem.Item, err error) {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("TestInvoke", contract.String(), method, success, time.Since(start))
}()
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
val, err := c.rpcActor.Call(contract, method, args...)
if err != nil {
return nil, err
}
if val.State != HaltState {
return nil, wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
}
success = true
return val.Stack, nil
}
// TransferGas to the receiver from local wallet.
func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
txHash, vub, err := c.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil)
if err != nil {
return err
}
c.logger.Debug(logs.ClientNativeGasTransferInvoke,
zap.String("to", receiver.StringLE()),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
transferParams := make([]nep17.TransferParameters, len(receivers))
receiversLog := make([]string, len(receivers))
for i, receiver := range receivers {
transferParams[i] = nep17.TransferParameters{
From: c.accAddr,
To: receiver,
Amount: big.NewInt(int64(amount)),
Data: nil,
}
receiversLog[i] = receiver.StringLE()
}
txHash, vub, err := c.gasToken.MultiTransfer(transferParams)
if err != nil {
return err
}
c.logger.Debug(logs.ClientBatchGasTransferInvoke,
zap.Strings("to", receiversLog),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
// Wait function blocks routing execution until there
// are `n` new blocks in the chain.
//
// Returns only connection errors.
func (c *Client) Wait(ctx context.Context, n uint32) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
var (
err error
height, newHeight uint32
)
height, err = c.rpcActor.GetBlockCount()
if err != nil {
c.logger.Error(logs.ClientCantGetBlockchainHeight,
zap.String("error", err.Error()))
return nil
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
newHeight, err = c.rpcActor.GetBlockCount()
if err != nil {
c.logger.Error(logs.ClientCantGetBlockchainHeight243,
zap.String("error", err.Error()))
return nil
}
if newHeight >= height+n {
return nil
}
time.Sleep(c.cfg.waitInterval)
}
}
// GasBalance returns GAS amount in the client's wallet.
func (c *Client) GasBalance() (res int64, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
bal, err := c.gasToken.BalanceOf(c.accAddr)
if err != nil {
return 0, err
}
return bal.Int64(), nil
}
// Committee returns keys of chain committee from neo native contract.
func (c *Client) Committee() (res keys.PublicKeys, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetCommittee()
}
// TxHalt returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return false, ErrConnectionLost
}
trig := trigger.Application
aer, err := c.client.GetApplicationLog(h, &trig)
if err != nil {
return false, err
}
return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil
}
// TxHeight returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return c.client.GetTransactionHeight(h)
}
// NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain
// stores alphabet node keys of inner ring there, however the sidechain stores both
// alphabet and non alphabet node keys of inner ring.
func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
list, err := c.roleList(noderoles.NeoFSAlphabet)
if err != nil {
return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err)
}
return list, nil
}
// GetDesignateHash returns hash of the native `RoleManagement` contract.
func (c *Client) GetDesignateHash() util.Uint160 {
return rolemgmt.Hash
}
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
height, err := c.rpcActor.GetBlockCount()
if err != nil {
return nil, fmt.Errorf("can't get chain height: %w", err)
}
return c.rolemgmt.GetDesignatedByRole(r, height)
}
// MagicNumber returns the magic number of the network
// to which the underlying RPC node client is connected.
func (c *Client) MagicNumber() (uint64, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return uint64(c.rpcActor.GetNetwork()), nil
}
// BlockCount returns block count of the network
// to which the underlying RPC node client is connected.
func (c *Client) BlockCount() (res uint32, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return c.rpcActor.GetBlockCount()
}
// MsPerBlock returns MillisecondsPerBlock network parameter.
func (c *Client) MsPerBlock() (res int64, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
v := c.rpcActor.GetVersion()
return int64(v.Protocol.MillisecondsPerBlock), nil
}
// IsValidScript returns true if invocation script executes with HALT state.
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return false, ErrConnectionLost
}
res, err := c.client.InvokeScript(script, signers)
if err != nil {
return false, fmt.Errorf("invokeScript: %w", err)
}
return res.State == vmstate.Halt.String(), nil
}
func (c *Client) Metrics() morphmetrics.Register {
return c.metrics
}
func (c *Client) setActor(act *actor.Actor) {
c.rpcActor = act
c.gasToken = nep17.New(act, gas.Hash)
c.rolemgmt = rolemgmt.New(act)
}
func (c *Client) GetActor() *actor.Actor {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
return c.rpcActor
}