frostfs-node/pkg/morph/client/client.go
Evgenii Stratonikov df055fead5 [#931] morph: Provide batch size for container listing explicitly
Besides VM stack item limit we also have restrictions on the size of
JSON for a stackitem, which is 128k. This limit is much harder to
calculate, because JSON representation includes type and the encoding is
different for different items. Thus is makes no sense to invent our own
default, so use the one provided by neo-go. But for container listing we
know exactly what we process, so use big enough value, which is tested.

Introduced in be8607a1f6.

Refs #902
Refs https://github.com/nspcc-dev/neo-go/blob/v0.105.0/pkg/vm/stackitem/json.go#L353

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-01-29 14:00:11 +00:00

576 lines
14 KiB
Go

package client
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
// Client is a wrapper over web socket neo-go client
// that provides smart-contract invocation interface
// and notification subscription functionality.
//
// On connection lost tries establishing new connection
// to the next RPC (if any). If no RPC node available,
// switches to inactive mode: any RPC call leads to immediate
// return with ErrConnectionLost error, notification channel
// returned from Client.NotificationChannel is closed.
//
// Working client must be created via constructor New.
// Using the Client that has been created with new(Client)
// expression (or just declaring a Client variable) is unsafe
// and can lead to panic.
type Client struct {
cache cache
logger *logger.Logger // logging component
metrics morphmetrics.Register
client *rpcclient.WSClient // neo-go websocket client
rpcActor *actor.Actor // neo-go RPC actor
gasToken *nep17.Token // neo-go GAS token wrapper
rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper
acc *wallet.Account // neo account
accAddr util.Uint160 // account's address
notary *notaryInfo
cfg cfg
endpoints endpoints
// switchLock protects endpoints, inactive, and subscription-related fields.
// It is taken exclusively during endpoint switch and locked in shared mode
// on every normal call.
switchLock sync.RWMutex
// channel for internal stop
closeChan chan struct{}
closed atomic.Bool
wg sync.WaitGroup
// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
inactive bool
// indicates that Client has already started
// goroutine that tries to switch to the higher
// priority RPC node
switchIsActive atomic.Bool
}
type cache struct {
m sync.RWMutex
nnsHash *util.Uint160
gKey *keys.PublicKey
txHeights *lru.Cache[util.Uint256, uint32]
metrics metrics.MorphCacheMetrics
}
func (c *cache) nns() *util.Uint160 {
c.m.RLock()
defer c.m.RUnlock()
return c.nnsHash
}
func (c *cache) setNNSHash(nnsHash util.Uint160) {
c.m.Lock()
defer c.m.Unlock()
c.nnsHash = &nnsHash
}
func (c *cache) groupKey() *keys.PublicKey {
c.m.RLock()
defer c.m.RUnlock()
return c.gKey
}
func (c *cache) setGroupKey(groupKey *keys.PublicKey) {
c.m.Lock()
defer c.m.Unlock()
c.gKey = groupKey
}
func (c *cache) invalidate() {
c.m.Lock()
defer c.m.Unlock()
c.nnsHash = nil
c.gKey = nil
c.txHeights.Purge()
}
var (
// ErrNilClient is returned by functions that expect
// a non-nil Client pointer, but received nil.
ErrNilClient = errors.New("client is nil")
// ErrConnectionLost is returned when client lost web socket connection
// to the RPC node and has not been able to establish a new one since.
ErrConnectionLost = errors.New("connection to the RPC node has been lost")
)
// HaltState returned if TestInvoke function processed without panic.
const HaltState = "HALT"
type notHaltStateError struct {
state, exception string
}
func (e *notHaltStateError) Error() string {
return fmt.Sprintf(
"chain/client: contract execution finished with state %s; exception: %s",
e.state,
e.exception,
)
}
// implementation of error interface for FrostFS-specific errors.
type frostfsError struct {
err error
}
func (e frostfsError) Error() string {
return fmt.Sprintf("frostfs error: %v", e.err)
}
// wraps FrostFS-specific error into frostfsError. Arg must not be nil.
func wrapFrostFSError(err error) error {
return frostfsError{err}
}
// Invoke invokes contract method by sending transaction into blockchain.
// Returns valid until block value.
// Supported args types: int64, string, util.Uint160, []byte and bool.
func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) (uint32, error) {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("Invoke", contract.String(), method, success, time.Since(start))
}()
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...)
if err != nil {
return 0, fmt.Errorf("could not invoke %s: %w", method, err)
}
c.logger.Debug(logs.ClientNeoClientInvoke,
zap.String("method", method),
zap.Uint32("vub", vub),
zap.Stringer("tx_hash", txHash.Reverse()))
success = true
return vub, nil
}
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
// If cb returns an error, the session is closed and this error is returned as-is.
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
// The default batchSize is 100, the default limit from neo-go.
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("TestInvokeIterator", contract.String(), method, success, time.Since(start))
}()
if batchSize <= 0 {
batchSize = invoker.DefaultIteratorResultItems
}
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
script, err := smartcontract.CreateCallAndPrefetchIteratorScript(contract, method, batchSize, args...)
if err != nil {
return err
}
val, err := c.rpcActor.Run(script)
if err != nil {
return err
} else if val.State != HaltState {
return wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
}
arr, sid, r, err := unwrap.ArrayAndSessionIterator(val, err)
if err != nil {
return err
}
for i := range arr {
if err := cb(arr[i]); err != nil {
return err
}
}
if (sid == uuid.UUID{}) {
success = true
return nil
}
defer func() {
_ = c.rpcActor.TerminateSession(sid)
}()
for {
items, err := c.rpcActor.TraverseIterator(sid, &r, batchSize)
if err != nil {
return err
}
for i := range items {
if err := cb(items[i]); err != nil {
return err
}
}
if len(items) < batchSize {
break
}
}
success = true
return nil
}
// TestInvoke invokes contract method locally in neo-go node. This method should
// be used to read data from smart-contract.
func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) (res []stackitem.Item, err error) {
start := time.Now()
success := false
defer func() {
c.metrics.ObserveInvoke("TestInvoke", contract.String(), method, success, time.Since(start))
}()
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
val, err := c.rpcActor.Call(contract, method, args...)
if err != nil {
return nil, err
}
if val.State != HaltState {
return nil, wrapFrostFSError(&notHaltStateError{state: val.State, exception: val.FaultException})
}
success = true
return val.Stack, nil
}
// TransferGas to the receiver from local wallet.
func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
txHash, vub, err := c.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil)
if err != nil {
return err
}
c.logger.Debug(logs.ClientNativeGasTransferInvoke,
zap.String("to", receiver.StringLE()),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
transferParams := make([]nep17.TransferParameters, len(receivers))
receiversLog := make([]string, len(receivers))
for i, receiver := range receivers {
transferParams[i] = nep17.TransferParameters{
From: c.accAddr,
To: receiver,
Amount: big.NewInt(int64(amount)),
Data: nil,
}
receiversLog[i] = receiver.StringLE()
}
txHash, vub, err := c.gasToken.MultiTransfer(transferParams)
if err != nil {
return err
}
c.logger.Debug(logs.ClientBatchGasTransferInvoke,
zap.Strings("to", receiversLog),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
// Wait function blocks routing execution until there
// are `n` new blocks in the chain.
//
// Returns only connection errors.
func (c *Client) Wait(ctx context.Context, n uint32) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
var (
err error
height, newHeight uint32
)
height, err = c.rpcActor.GetBlockCount()
if err != nil {
c.logger.Error(logs.ClientCantGetBlockchainHeight,
zap.String("error", err.Error()))
return nil
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
newHeight, err = c.rpcActor.GetBlockCount()
if err != nil {
c.logger.Error(logs.ClientCantGetBlockchainHeight243,
zap.String("error", err.Error()))
return nil
}
if newHeight >= height+n {
return nil
}
time.Sleep(c.cfg.waitInterval)
}
}
// GasBalance returns GAS amount in the client's wallet.
func (c *Client) GasBalance() (res int64, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
bal, err := c.gasToken.BalanceOf(c.accAddr)
if err != nil {
return 0, err
}
return bal.Int64(), nil
}
// Committee returns keys of chain committee from neo native contract.
func (c *Client) Committee() (res keys.PublicKeys, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
return c.client.GetCommittee()
}
// TxHalt returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHalt(h util.Uint256) (res bool, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return false, ErrConnectionLost
}
trig := trigger.Application
aer, err := c.client.GetApplicationLog(h, &trig)
if err != nil {
return false, err
}
return len(aer.Executions) > 0 && aer.Executions[0].VMState.HasFlag(vmstate.Halt), nil
}
// TxHeight returns true if transaction has been successfully executed and persisted.
func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return c.client.GetTransactionHeight(h)
}
// NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain
// stores alphabet node keys of inner ring there, however the sidechain stores both
// alphabet and non alphabet node keys of inner ring.
func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return nil, ErrConnectionLost
}
list, err := c.roleList(noderoles.NeoFSAlphabet)
if err != nil {
return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err)
}
return list, nil
}
// GetDesignateHash returns hash of the native `RoleManagement` contract.
func (c *Client) GetDesignateHash() util.Uint160 {
return rolemgmt.Hash
}
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
height, err := c.rpcActor.GetBlockCount()
if err != nil {
return nil, fmt.Errorf("can't get chain height: %w", err)
}
return c.rolemgmt.GetDesignatedByRole(r, height)
}
// MagicNumber returns the magic number of the network
// to which the underlying RPC node client is connected.
func (c *Client) MagicNumber() (uint64, error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return uint64(c.rpcActor.GetNetwork()), nil
}
// BlockCount returns block count of the network
// to which the underlying RPC node client is connected.
func (c *Client) BlockCount() (res uint32, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
return c.rpcActor.GetBlockCount()
}
// MsPerBlock returns MillisecondsPerBlock network parameter.
func (c *Client) MsPerBlock() (res int64, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return 0, ErrConnectionLost
}
v := c.rpcActor.GetVersion()
return int64(v.Protocol.MillisecondsPerBlock), nil
}
// IsValidScript returns true if invocation script executes with HALT state.
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return false, ErrConnectionLost
}
res, err := c.client.InvokeScript(script, signers)
if err != nil {
return false, fmt.Errorf("invokeScript: %w", err)
}
return res.State == vmstate.Halt.String(), nil
}
func (c *Client) Metrics() morphmetrics.Register {
return c.metrics
}
func (c *Client) setActor(act *actor.Actor) {
c.rpcActor = act
c.gasToken = nep17.New(act, gas.Hash)
c.rolemgmt = rolemgmt.New(act)
}
func (c *Client) GetActor() *actor.Actor {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
return c.rpcActor
}