mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2025-03-14 13:28:39 +00:00
Compare commits
26 commits
Author | SHA1 | Date | |
---|---|---|---|
|
24a6d842b5 | ||
|
ee56f73606 | ||
|
210059fff8 | ||
|
4a510637be | ||
|
2fd51cf6b1 | ||
|
7d72b6538a | ||
|
70b4d005e3 | ||
|
2881961421 | ||
|
5f80a142b0 | ||
|
d411337648 | ||
|
2c8bd056fa | ||
|
9c0274850a | ||
|
94660f6333 | ||
|
c3916ef2c3 | ||
|
22a5bbcef3 | ||
|
e63cbe7c82 | ||
|
13b75c9d1a | ||
|
3ec14d2e8f | ||
|
e9f496a19f | ||
|
68d7e8e01c | ||
|
a227572d01 | ||
|
64e3b6aa48 | ||
|
a7c66dcb0b | ||
|
ff8ea5d4e8 | ||
|
0d8c751e50 | ||
|
d0e47c739a |
39 changed files with 721 additions and 198 deletions
|
@ -25,8 +25,11 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
@ -43,9 +46,13 @@ const (
|
|||
DefaultAwaitableTimeout = 3 * 15 * time.Second
|
||||
)
|
||||
|
||||
// RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to
|
||||
// check for flag presence in the context.
|
||||
const RPCEndpointFlag = "rpc-endpoint"
|
||||
const (
|
||||
// RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to
|
||||
// check for flag presence in the context.
|
||||
RPCEndpointFlag = "rpc-endpoint"
|
||||
// NeoFSRPCEndpointFlag is a long flag name for a NeoFS RPC endpoint.
|
||||
NeoFSRPCEndpointFlag = "fs-rpc-endpoint"
|
||||
)
|
||||
|
||||
// Wallet is a set of flags used for wallet operations.
|
||||
var Wallet = []cli.Flag{
|
||||
|
@ -101,6 +108,22 @@ var RPC = []cli.Flag{
|
|||
},
|
||||
}
|
||||
|
||||
// NeoFSRPC is a set of flags used for NeoFS RPC connections (endpoint).
|
||||
var NeoFSRPC = []cli.Flag{&cli.StringSliceFlag{
|
||||
Name: NeoFSRPCEndpointFlag,
|
||||
Aliases: []string{"fsr"},
|
||||
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
|
||||
Required: true,
|
||||
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
|
||||
for _, endpoint := range fsRpcEndpoints {
|
||||
if endpoint == "" {
|
||||
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}}
|
||||
|
||||
// Historic is a flag for commands that can perform historic invocations.
|
||||
var Historic = &cli.StringFlag{
|
||||
Name: "historic",
|
||||
|
@ -180,6 +203,26 @@ func GetRPCClient(gctx context.Context, ctx *cli.Context) (*rpcclient.Client, cl
|
|||
return c, nil
|
||||
}
|
||||
|
||||
// GetNeoFSClientPool returns a NeoFS pool and a signer for the given Context.
|
||||
func GetNeoFSClientPool(ctx *cli.Context, acc *wallet.Account) (user.Signer, neofs.PoolWrapper, error) {
|
||||
rpcNeoFS := ctx.StringSlice(NeoFSRPCEndpointFlag)
|
||||
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
|
||||
|
||||
params := pool.DefaultOptions()
|
||||
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
|
||||
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
|
||||
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
|
||||
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
|
||||
if err != nil {
|
||||
return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to create NeoFS pool: %w", err)
|
||||
}
|
||||
pWrapper := neofs.PoolWrapper{Pool: p}
|
||||
if err = pWrapper.Dial(context.Background()); err != nil {
|
||||
return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to dial NeoFS pool: %w", err)
|
||||
}
|
||||
return signer, pWrapper, nil
|
||||
}
|
||||
|
||||
// GetInvoker returns an invoker using the given RPC client, context and signers.
|
||||
// It parses "--historic" parameter to adjust it.
|
||||
func GetInvoker(c *rpcclient.Client, ctx *cli.Context, signers []transaction.Signer) (*invoker.Invoker, cli.ExitCoder) {
|
||||
|
|
|
@ -30,7 +30,7 @@ func dumpBin(ctx *cli.Context) error {
|
|||
count := uint32(ctx.Uint("count"))
|
||||
start := uint32(ctx.Uint("start"))
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -140,10 +140,11 @@ func newGraceContext() context.Context {
|
|||
return ctx
|
||||
}
|
||||
|
||||
func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
|
||||
chain, _, err := initBlockChain(cfg, log)
|
||||
// InitBCWithMetrics initializes the blockchain with metrics with the given configuration.
|
||||
func InitBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, *metrics.Service, *metrics.Service, error) {
|
||||
chain, store, err := initBlockChain(cfg, log)
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(err, 1)
|
||||
return nil, nil, nil, nil, cli.Exit(err, 1)
|
||||
}
|
||||
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
|
||||
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
|
||||
|
@ -151,14 +152,14 @@ func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *m
|
|||
go chain.Run()
|
||||
err = prometheus.Start()
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
|
||||
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
|
||||
}
|
||||
err = pprof.Start()
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
|
||||
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
|
||||
}
|
||||
|
||||
return chain, prometheus, pprof, nil
|
||||
return chain, store, prometheus, pprof, nil
|
||||
}
|
||||
|
||||
func dumpDB(ctx *cli.Context) error {
|
||||
|
@ -189,7 +190,7 @@ func dumpDB(ctx *cli.Context) error {
|
|||
defer outStream.Close()
|
||||
writer := io.NewBinWriterFromIO(outStream)
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -249,7 +250,7 @@ func restoreDB(ctx *cli.Context) error {
|
|||
cfg.ApplicationConfiguration.SaveStorageBatch = true
|
||||
}
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -470,7 +471,7 @@ func startServer(ctx *cli.Context) error {
|
|||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
|
|
@ -185,11 +185,11 @@ func TestInitBCWithMetrics(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("bad store", func(t *testing.T) {
|
||||
_, _, _, err = initBCWithMetrics(config.Config{}, logger)
|
||||
_, _, _, _, err = InitBCWithMetrics(config.Config{}, logger)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, logger)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, logger)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
chain.Close()
|
||||
|
|
|
@ -16,6 +16,35 @@ import (
|
|||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var neoFSFlags = append([]cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "container",
|
||||
Aliases: []string{"cid"},
|
||||
Usage: "NeoFS container ID to upload objects to",
|
||||
Required: true,
|
||||
Action: cmdargs.EnsureNotEmpty("container"),
|
||||
},
|
||||
&flags.AddressFlag{
|
||||
Name: "address",
|
||||
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "retries",
|
||||
Usage: "Maximum number of NeoFS node request retries",
|
||||
Value: neofs.MaxRetries,
|
||||
Action: func(context *cli.Context, u uint) error {
|
||||
if u < 1 {
|
||||
return cli.Exit("retries should be greater than 0", 1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "searchers",
|
||||
Usage: "Number of concurrent searches for objects",
|
||||
Value: 100,
|
||||
}}, options.NeoFSRPC...)
|
||||
|
||||
// NewCommands returns util commands for neo-go CLI.
|
||||
func NewCommands() []*cli.Command {
|
||||
// By default, RPC flag is required. sendtx and txdump may be called without provided rpc-endpoint.
|
||||
|
@ -35,27 +64,6 @@ func NewCommands() []*cli.Command {
|
|||
}, options.RPC...)
|
||||
txCancelFlags = append(txCancelFlags, options.Wallet...)
|
||||
uploadBinFlags := append([]cli.Flag{
|
||||
&cli.StringSliceFlag{
|
||||
Name: "fs-rpc-endpoint",
|
||||
Aliases: []string{"fsr"},
|
||||
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
|
||||
Required: true,
|
||||
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
|
||||
for _, endpoint := range fsRpcEndpoints {
|
||||
if endpoint == "" {
|
||||
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "container",
|
||||
Aliases: []string{"cid"},
|
||||
Usage: "NeoFS container ID to upload blocks to",
|
||||
Required: true,
|
||||
Action: cmdargs.EnsureNotEmpty("container"),
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "block-attribute",
|
||||
Usage: "Attribute key of the block object",
|
||||
|
@ -68,10 +76,6 @@ func NewCommands() []*cli.Command {
|
|||
Value: neofs.DefaultIndexFileAttribute,
|
||||
Action: cmdargs.EnsureNotEmpty("index-attribute"),
|
||||
},
|
||||
&flags.AddressFlag{
|
||||
Name: "address",
|
||||
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "index-file-size",
|
||||
Usage: "Size of index file",
|
||||
|
@ -82,25 +86,22 @@ func NewCommands() []*cli.Command {
|
|||
Usage: "Number of workers to fetch and upload blocks concurrently",
|
||||
Value: 20,
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "searchers",
|
||||
Usage: "Number of concurrent searches for blocks",
|
||||
Value: 100,
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "retries",
|
||||
Usage: "Maximum number of Neo/NeoFS node request retries",
|
||||
Value: neofs.MaxRetries,
|
||||
Action: func(context *cli.Context, u uint) error {
|
||||
if u < 1 {
|
||||
return cli.Exit("retries should be greater than 0", 1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
options.Debug,
|
||||
}, options.RPC...)
|
||||
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
|
||||
uploadBinFlags = append(uploadBinFlags, neoFSFlags...)
|
||||
|
||||
uploadStateFlags := append([]cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "state-attribute",
|
||||
Usage: "Attribute key of the state object",
|
||||
Value: neofs.DefaultStateAttribute,
|
||||
Action: cmdargs.EnsureNotEmpty("state-attribute"),
|
||||
},
|
||||
options.Debug, options.Config, options.ConfigFile, options.RelativePath,
|
||||
}, options.Wallet...)
|
||||
uploadStateFlags = append(uploadStateFlags, options.Network...)
|
||||
uploadStateFlags = append(uploadStateFlags, neoFSFlags...)
|
||||
return []*cli.Command{
|
||||
{
|
||||
Name: "util",
|
||||
|
@ -185,6 +186,13 @@ func NewCommands() []*cli.Command {
|
|||
Action: uploadBin,
|
||||
Flags: uploadBinFlags,
|
||||
},
|
||||
{
|
||||
Name: "upload-state",
|
||||
Usage: "Start the node, traverse MPT and upload MPT nodes to the NeoFS container at every StateSyncInterval number of blocks",
|
||||
UsageText: "neo-go util upload-state --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --state-attribute state --wallet <wallet> [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--config-path path] [-p/-m/-t] [--config-file file]",
|
||||
Action: uploadState,
|
||||
Flags: uploadStateFlags,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -19,28 +19,14 @@ import (
|
|||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
||||
type poolWrapper struct {
|
||||
*pool.Pool
|
||||
}
|
||||
|
||||
// Close closes the pool and returns nil.
|
||||
func (p poolWrapper) Close() error {
|
||||
p.Pool.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func uploadBin(ctx *cli.Context) error {
|
||||
if err := cmdargs.EnsureNone(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
||||
containerIDStr := ctx.String("container")
|
||||
attr := ctx.String("block-attribute")
|
||||
numWorkers := ctx.Uint("workers")
|
||||
maxParallelSearches := ctx.Uint("searchers")
|
||||
|
@ -52,12 +38,6 @@ func uploadBin(ctx *cli.Context) error {
|
|||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||
}
|
||||
|
||||
var containerID cid.ID
|
||||
if err = containerID.DecodeString(containerIDStr); err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1)
|
||||
}
|
||||
|
||||
gctx, cancel := options.GetTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
rpc, err := options.GetRPCClient(gctx, ctx)
|
||||
|
@ -65,39 +45,20 @@ func uploadBin(ctx *cli.Context) error {
|
|||
return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1)
|
||||
}
|
||||
|
||||
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
|
||||
|
||||
params := pool.DefaultOptions()
|
||||
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
|
||||
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
|
||||
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
|
||||
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
|
||||
signer, pWrapper, err := options.GetNeoFSClientPool(ctx, acc)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
pWrapper := poolWrapper{p}
|
||||
if err = pWrapper.Dial(context.Background()); err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
var containerObj container.Container
|
||||
err = retry(func() error {
|
||||
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
|
||||
return err
|
||||
}, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
|
||||
}
|
||||
containerMagic := containerObj.Attribute("Magic")
|
||||
defer pWrapper.Close()
|
||||
|
||||
v, err := rpc.GetVersion()
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to get version from RPC: %v", err), 1)
|
||||
}
|
||||
magic := strconv.Itoa(int(v.Protocol.Network))
|
||||
if containerMagic != magic {
|
||||
return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1)
|
||||
containerID, err := getContainer(ctx, pWrapper, magic, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
||||
currentBlockHeight, err := rpc.GetBlockCount()
|
||||
|
@ -138,7 +99,7 @@ func retry(action func() error, maxRetries uint, debug bool) error {
|
|||
}
|
||||
|
||||
// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool.
|
||||
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error {
|
||||
func uploadBlocksAndIndexFiles(ctx *cli.Context, p neofs.PoolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error {
|
||||
if currentIndexFileID*indexFileSize >= currentBlockHeight {
|
||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight)
|
||||
return nil
|
||||
|
@ -257,7 +218,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to upload index file: %w", err)
|
||||
}
|
||||
fmt.Println("Successfully uploaded index file ", indexFileStart/indexFileSize)
|
||||
fmt.Fprintln(ctx.App.Writer, "Successfully uploaded index file ", indexFileStart/indexFileSize)
|
||||
}
|
||||
clear(buf)
|
||||
}
|
||||
|
@ -265,7 +226,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
|
|||
}
|
||||
|
||||
// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
|
||||
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
|
||||
func searchIndexFile(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
|
||||
var (
|
||||
// buf is used to store OIDs of the uploaded blocks.
|
||||
buf = make([]byte, indexFileSize*oid.Size)
|
||||
|
@ -357,7 +318,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKe
|
|||
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
||||
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
||||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
func searchObjects(ctx context.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
|
@ -419,7 +380,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privK
|
|||
}
|
||||
|
||||
// uploadObj uploads object to the container using provided settings.
|
||||
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) {
|
||||
func uploadObj(ctx context.Context, p neofs.PoolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) {
|
||||
var (
|
||||
hdr object.Object
|
||||
prmObjectPutInit client.PrmObjectPutInit
|
||||
|
@ -461,3 +422,28 @@ func getBlockIndex(header object.Object, attribute string) (int, error) {
|
|||
}
|
||||
return -1, fmt.Errorf("attribute %s not found", attribute)
|
||||
}
|
||||
|
||||
// getContainer gets container by ID and checks its magic.
|
||||
func getContainer(ctx *cli.Context, p neofs.PoolWrapper, expectedMagic string, maxRetries uint, debug bool) (cid.ID, error) {
|
||||
var (
|
||||
containerObj container.Container
|
||||
err error
|
||||
containerIDStr = ctx.String("container")
|
||||
)
|
||||
var containerID cid.ID
|
||||
if err = containerID.DecodeString(containerIDStr); err != nil {
|
||||
return containerID, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
err = retry(func() error {
|
||||
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
|
||||
return err
|
||||
}, maxRetries, debug)
|
||||
if err != nil {
|
||||
return containerID, fmt.Errorf("failed to get container: %w", err)
|
||||
}
|
||||
containerMagic := containerObj.Attribute("Magic")
|
||||
if containerMagic != expectedMagic {
|
||||
return containerID, fmt.Errorf("container magic mismatch: expected %s, got %s", expectedMagic, containerMagic)
|
||||
}
|
||||
return containerID, nil
|
||||
}
|
||||
|
|
206
cli/util/upload_state.go
Normal file
206
cli/util/upload_state.go
Normal file
|
@ -0,0 +1,206 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
||||
"github.com/nspcc-dev/neo-go/cli/options"
|
||||
"github.com/nspcc-dev/neo-go/cli/server"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
gio "github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func uploadState(ctx *cli.Context) error {
|
||||
if err := cmdargs.EnsureNone(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
cfg, err := options.GetConfigFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
attr := ctx.String("state-attribute")
|
||||
maxRetries := ctx.Uint("retries")
|
||||
debug := ctx.Bool("debug")
|
||||
|
||||
acc, _, err := options.GetAccFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||
}
|
||||
|
||||
signer, p, err := options.GetNeoFSClientPool(ctx, acc)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
defer p.Close()
|
||||
log, _, logCloser, err := options.HandleLoggingParams(debug, cfg.ApplicationConfiguration)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
if logCloser != nil {
|
||||
defer func() { _ = logCloser() }()
|
||||
}
|
||||
|
||||
chain, store, prometheus, pprof, err := server.InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
pprof.ShutDown()
|
||||
prometheus.ShutDown()
|
||||
chain.Close()
|
||||
}()
|
||||
|
||||
if chain.GetConfig().Ledger.KeepOnlyLatestState || chain.GetConfig().Ledger.RemoveUntraceableBlocks {
|
||||
return cli.Exit("only full-state node is supported: disable KeepOnlyLatestState and RemoveUntraceableBlocks", 1)
|
||||
}
|
||||
syncInterval := cfg.ProtocolConfiguration.StateSyncInterval
|
||||
if syncInterval == 0 {
|
||||
syncInterval = core.DefaultStateSyncInterval
|
||||
}
|
||||
|
||||
containerID, err := getContainer(ctx, p, strconv.Itoa(int(chain.GetConfig().Magic)), maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
||||
stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1)
|
||||
}
|
||||
stateModule := chain.GetStateModule()
|
||||
currentHeight := int(stateModule.CurrentLocalHeight())
|
||||
currentStateIndex := currentHeight / syncInterval
|
||||
if currentStateIndex <= stateObjCount {
|
||||
log.Info("no new states to upload",
|
||||
zap.Int("number of uploaded state objects", stateObjCount),
|
||||
zap.Int("latest state is uploaded for block", stateObjCount*syncInterval),
|
||||
zap.Int("current height", currentHeight),
|
||||
zap.Int("StateSyncInterval", syncInterval))
|
||||
return nil
|
||||
}
|
||||
log.Info("starting uploading",
|
||||
zap.Int("number of uploaded state objects", stateObjCount),
|
||||
zap.Int("next state to upload for block", stateObjCount*syncInterval),
|
||||
zap.Int("current height", currentHeight),
|
||||
zap.Int("StateSyncInterval", syncInterval),
|
||||
zap.Int("number of states to upload", currentStateIndex-stateObjCount))
|
||||
for state := stateObjCount; state < currentStateIndex; state++ {
|
||||
height := uint32(state * syncInterval)
|
||||
stateRoot, err := stateModule.GetStateRoot(height)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to get state root for height %d: %v", height, err), 1)
|
||||
}
|
||||
h, err := chain.GetHeader(chain.GetHeaderHash(height))
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to get header %d: %v", height, err), 1)
|
||||
}
|
||||
|
||||
var (
|
||||
hdr object.Object
|
||||
prmObjectPutInit client.PrmObjectPutInit
|
||||
attrs = []object.Attribute{
|
||||
*object.NewAttribute(attr, strconv.Itoa(int(height))),
|
||||
*object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
|
||||
*object.NewAttribute("StateRoot", stateRoot.Root.StringLE()),
|
||||
*object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)),
|
||||
*object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)),
|
||||
}
|
||||
)
|
||||
hdr.SetContainerID(containerID)
|
||||
hdr.SetOwner(signer.UserID())
|
||||
hdr.SetAttributes(attrs...)
|
||||
err = retry(func() error {
|
||||
writer, err := p.ObjectPutInit(ctx.Context, hdr, signer, prmObjectPutInit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
start := time.Now()
|
||||
wrt := gio.NewBinWriterFromIO(writer)
|
||||
wrt.WriteB(byte(0))
|
||||
wrt.WriteU32LE(uint32(chain.GetConfig().Magic))
|
||||
wrt.WriteU32LE(height)
|
||||
wrt.WriteBytes(stateRoot.Root[:])
|
||||
err = traverseMPT(stateRoot.Root, store, wrt)
|
||||
if err != nil {
|
||||
_ = writer.Close()
|
||||
return err
|
||||
}
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
duration := time.Since(start)
|
||||
res := writer.GetResult()
|
||||
log.Info("uploaded state object",
|
||||
zap.String("object ID", res.StoredObjectID().String()),
|
||||
zap.Uint32("height", height),
|
||||
zap.Duration("time spent", duration))
|
||||
return nil
|
||||
}, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to upload object at height %d: %v", height, err), 1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func searchStateIndex(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey,
|
||||
attributeKey string, syncInterval int, maxRetries uint, debug bool,
|
||||
) (int, error) {
|
||||
var (
|
||||
doneCh = make(chan struct{})
|
||||
errCh = make(chan error)
|
||||
objCount = 0
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
for i := 0; ; i++ {
|
||||
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys,
|
||||
attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh)
|
||||
resOIDs := make([]oid.ID, 0, 1)
|
||||
for id := range indexIDs {
|
||||
resOIDs = append(resOIDs, id)
|
||||
}
|
||||
if len(resOIDs) == 0 {
|
||||
break
|
||||
}
|
||||
if len(resOIDs) > 1 {
|
||||
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs)
|
||||
}
|
||||
objCount++
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return objCount, err
|
||||
case <-doneCh:
|
||||
return objCount, nil
|
||||
}
|
||||
}
|
||||
|
||||
func traverseMPT(root util.Uint256, store storage.Store, writer *gio.BinWriter) error {
|
||||
cache := storage.NewMemCachedStore(store)
|
||||
billet := mpt.NewBillet(root, mpt.ModeAll, mpt.DummySTTempStoragePrefix, cache)
|
||||
err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
|
||||
writer.WriteVarBytes(nodeBytes)
|
||||
return writer.Err != nil
|
||||
}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("billet traversal error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -204,8 +204,6 @@ func TestUploadBin(t *testing.T) {
|
|||
e.In.WriteString("one\r")
|
||||
e.RunWithErrorCheckExit(t, "failed to load account", append(args, "--cid", "test", "--wallet", "./not-exist.json", "--rpc-endpoint", "https://test")...)
|
||||
e.In.WriteString("one\r")
|
||||
e.RunWithErrorCheckExit(t, "failed to decode container ID", append(args, "--cid", "test", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
|
||||
e.In.WriteString("one\r")
|
||||
e.RunWithErrorCheckExit(t, "failed to create RPC client", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
|
||||
e.In.WriteString("one\r")
|
||||
e.RunWithErrorCheckExit(t, "failed to dial NeoFS pool", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "http://"+e.RPC.Addresses()[0])...)
|
||||
|
|
|
@ -73,8 +73,8 @@ parameter.
|
|||
Once all blocks available in the NeoFS container are processed, the service
|
||||
shuts down automatically.
|
||||
|
||||
### NeoFS Upload Command
|
||||
The `upload-bin` command is designed to fetch blocks from the RPC node and upload
|
||||
### NeoFS block uploading command
|
||||
The `util upload-bin` command is designed to fetch blocks from the RPC node and upload
|
||||
them to the NeoFS container. It also creates and uploads index files. Below is an
|
||||
example usage of the command:
|
||||
|
||||
|
@ -101,4 +101,26 @@ files are needed (different `index-file-size` or `index-attribute`), `upload-bin
|
|||
will upload the entire block sequence starting from genesis since no migration is
|
||||
supported yet by this command. Please, add a comment to the
|
||||
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
|
||||
functionality.
|
||||
functionality.
|
||||
|
||||
### NeoFS state uploading command
|
||||
The `util upload-state` command is used to start a node, traverse the MPT over the
|
||||
smart contract storage, and upload MPT nodes to a NeoFS container at every
|
||||
`StateSyncInterval` number of blocks. Below is an example usage of the command:
|
||||
|
||||
```shell
|
||||
./bin/neo-go util upload-state --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --state-attribute State -m -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080
|
||||
```
|
||||
|
||||
Run `./bin/neo-go util upload-state --help` to see the full list of supported options.
|
||||
|
||||
This command works as follows:
|
||||
1. Searches for the state objects stored in NeoFS to find the latest uploaded object.
|
||||
2. Checks if new state objects could be uploaded given the current local state height.
|
||||
3. Traverses the MPT nodes (pre-order) starting from the stateroot at the height of the
|
||||
latest uploaded state object down to its children.
|
||||
4. Uploads the MPT nodes to the NeoFS container.
|
||||
5. Repeats steps 3-4 with a step equal to the `StateSyncInterval` number of blocks.
|
||||
|
||||
If the command is interrupted, it can be resumed. It starts the uploading process
|
||||
from the last uploaded state object.
|
|
@ -251,6 +251,7 @@ RPC:
|
|||
MaxRequestBodyBytes: 5242880
|
||||
MaxRequestHeaderBytes: 1048576
|
||||
MaxWebSocketClients: 64
|
||||
MaxWebSocketFeeds: 16
|
||||
SessionEnabled: false
|
||||
SessionExpirationTime: 15
|
||||
SessionBackedByMPT: false
|
||||
|
@ -296,6 +297,9 @@ where:
|
|||
number (64 by default). Attempts to establish additional connections will
|
||||
lead to websocket handshake failures. Use "-1" to disable websocket
|
||||
connections (0 will lead to using the default value).
|
||||
- `MaxWebSocketFeeds` -- the maximum simultaneous event subscriptions number
|
||||
for a single client (16 by default). Attemps to create additional subscriptions
|
||||
will lead to error.
|
||||
- `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled.
|
||||
If true, then all iterators got from `invoke*` calls will be stored as sessions
|
||||
on the server side available for further traverse. `traverseiterator` and
|
||||
|
@ -476,7 +480,7 @@ in development and can change in an incompatible way.
|
|||
| `Basilisk` | Enables strict smart contract script check against a set of JMP instructions and against method boundaries enabled on contract deploy or update. Increases `stackitem.Integer` JSON parsing precision up to the maximum value supported by the NeoVM. Enables strict check for notifications emitted by a contract to precisely match the events specified in the contract manifest. | https://github.com/nspcc-dev/neo-go/pull/3056 <br> https://github.com/neo-project/neo/pull/2881 <br> https://github.com/nspcc-dev/neo-go/pull/3080 <br> https://github.com/neo-project/neo/pull/2883 <br> https://github.com/nspcc-dev/neo-go/pull/3085 <br> https://github.com/neo-project/neo/pull/2810 |
|
||||
| `Cockatrice` | Introduces the ability to update native contracts. Includes a couple of new native smart contract APIs: `keccak256` of native CryptoLib contract and `getCommitteeAddress` of native NeoToken contract. | https://github.com/nspcc-dev/neo-go/pull/3402 <br> https://github.com/neo-project/neo/pull/2942 <br> https://github.com/nspcc-dev/neo-go/pull/3301 <br> https://github.com/neo-project/neo/pull/2925 <br> https://github.com/nspcc-dev/neo-go/pull/3362 <br> https://github.com/neo-project/neo/pull/3154 |
|
||||
| `Domovoi` | Makes node use executing contract state for the contract call permissions check instead of the state stored in the native Management contract. In C# also makes System.Runtime.GetNotifications interop properly count stack references of notification parameters which prevents users from creating objects that exceed MaxStackSize constraint, but NeoGo has never had this bug, thus proper behaviour is preserved even before HFDomovoi. It results in the fact that some T5 testnet transactions have different ApplicationLogs compared to the C# node, but the node states match. | https://github.com/nspcc-dev/neo-go/pull/3476 <br> https://github.com/neo-project/neo/pull/3290 <br> https://github.com/nspcc-dev/neo-go/pull/3473 <br> https://github.com/neo-project/neo/pull/3290 <br> https://github.com/neo-project/neo/pull/3301 <br> https://github.com/nspcc-dev/neo-go/pull/3485 |
|
||||
| `Echidna` | Introduces `Designation` event extension with `Old` and `New` roles data to native RoleManagement contract. Adds support for `base64UrlEncode` and `base64UrlDecode` methods to native StdLib contract. Extends the list of required call flags for `registerCandidate`, `unregisterCandidate`and `vote` methods of native NeoToken contract with AllowNotify flag. Enables `onNEP17Payment` method of NEO contract for candidate registration. | https://github.com/nspcc-dev/neo-go/pull/3554 <br> https://github.com/nspcc-dev/neo-go/pull/3761 <br> https://github.com/nspcc-dev/neo-go/pull/3554 <br> https://github.com/neo-project/neo/pull/3597 <br> https://github.com/nspcc-dev/neo-go/pull/3700 |
|
||||
| `Echidna` | Introduces `Designation` event extension with `Old` and `New` roles data to native RoleManagement contract. Adds support for `base64UrlEncode` and `base64UrlDecode` methods to native StdLib contract. Extends the list of required call flags for `registerCandidate`, `unregisterCandidate`and `vote` methods of native NeoToken contract with AllowNotify flag. Enables `onNEP17Payment` method of NEO contract for candidate registration. Introduces constraint for maximum number of execution notifications. | https://github.com/nspcc-dev/neo-go/pull/3554 <br> https://github.com/nspcc-dev/neo-go/pull/3761 <br> https://github.com/nspcc-dev/neo-go/pull/3554 <br> https://github.com/neo-project/neo/pull/3597 <br> https://github.com/nspcc-dev/neo-go/pull/3700 <br> https://github.com/nspcc-dev/neo-go/pull/3640 <br> https://github.com/neo-project/neo/pull/3548 |
|
||||
|
||||
|
||||
## DB compatibility
|
||||
|
|
|
@ -58,6 +58,9 @@ method. Upon successful subscription, clients receive subscription ID for
|
|||
subsequent management of this subscription. Subscription is only valid for
|
||||
connection lifetime, no long-term client identification is being made.
|
||||
|
||||
The maximum number of simultaneous subscriptions can be set server-side
|
||||
via `MaxWebSocketFeeds` setting.
|
||||
|
||||
Errors are not described down below, but they can be returned as standard
|
||||
JSON-RPC errors (most often caused by invalid parameters).
|
||||
|
||||
|
|
|
@ -44,7 +44,8 @@ const (
|
|||
// match. See #3485 for details.
|
||||
HFDomovoi // Domovoi
|
||||
// HFEchidna represents hard-fork introduced in #3554 (ported from
|
||||
// https://github.com/neo-project/neo/pull/3454).
|
||||
// https://github.com/neo-project/neo/pull/3454), #3640 (ported from
|
||||
// https://github.com/neo-project/neo/pull/3548).
|
||||
HFEchidna // Echidna
|
||||
// hfLast denotes the end of hardforks enum. Consider adding new hardforks
|
||||
// before hfLast.
|
||||
|
|
|
@ -19,6 +19,7 @@ type (
|
|||
MaxRequestBodyBytes int `yaml:"MaxRequestBodyBytes"`
|
||||
MaxRequestHeaderBytes int `yaml:"MaxRequestHeaderBytes"`
|
||||
MaxWebSocketClients int `yaml:"MaxWebSocketClients"`
|
||||
MaxWebSocketFeeds int `yaml:"MaxWebSocketFeeds"`
|
||||
SessionEnabled bool `yaml:"SessionEnabled"`
|
||||
SessionExpirationTime int `yaml:"SessionExpirationTime"`
|
||||
SessionBackedByMPT bool `yaml:"SessionBackedByMPT"`
|
||||
|
|
|
@ -238,18 +238,26 @@ func (b *Header) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
|
||||
// GetExpectedHeaderSize returns the expected Header size with the given number of validators.
|
||||
// To calculate genesis block Header size numOfValidators should be 0.
|
||||
func GetExpectedHeaderSize(stateRootInHeader bool, numOfValidators int) int {
|
||||
m := smartcontract.GetDefaultHonestNodeCount(numOfValidators)
|
||||
// expectedHeaderSizeWithEmptyWitness contains 2 bytes for zero-length (new(Header)).Script.Invocation/Verification
|
||||
// InvocationScript:
|
||||
// 64 is the size of the default signature length + 2 bytes length and opcode
|
||||
// 2 = 1 push opcode + 1 length
|
||||
// VerifcationScript:
|
||||
// m = 1 bytes
|
||||
// 33 = 1 push opcode + 1 length + 33 bytes for public key
|
||||
// n = 1 bytes
|
||||
// 5 for SYSCALL
|
||||
size := expectedHeaderSizeWithEmptyWitness + (1+1+64)*m + 2 + numOfValidators*(1+1+33) + 2 + 5
|
||||
var size int
|
||||
// Genesis block case.
|
||||
if numOfValidators == 0 {
|
||||
// 1 byte for the PUSH1 opcode.
|
||||
size = expectedHeaderSizeWithEmptyWitness + 1
|
||||
} else {
|
||||
m := smartcontract.GetDefaultHonestNodeCount(numOfValidators)
|
||||
// expectedHeaderSizeWithEmptyWitness contains 2 bytes for zero-length (new(Header)).Script.Invocation/Verification
|
||||
// InvocationScript:
|
||||
// 64 is the size of the default signature length + 2 bytes length and opcode
|
||||
// 2 = 1 push opcode + 1 length
|
||||
// VerifcationScript:
|
||||
// m = 1 bytes
|
||||
// 33 = 1 push opcode + 1 length + 33 bytes for public key
|
||||
// n = 1 bytes
|
||||
// 5 for SYSCALL
|
||||
size = expectedHeaderSizeWithEmptyWitness + (1+1+64)*m + 2 + numOfValidators*(1+1+33) + 2 + 5
|
||||
}
|
||||
|
||||
if stateRootInHeader {
|
||||
size += util.Uint256Size
|
||||
|
|
|
@ -61,7 +61,8 @@ const (
|
|||
defaultTimePerBlock = 15 * time.Second
|
||||
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
|
||||
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
|
||||
defaultStateSyncInterval = 40000
|
||||
// DefaultStateSyncInterval is the default interval for state sync.
|
||||
DefaultStateSyncInterval = 40000
|
||||
|
||||
// defaultBlockTimesCache should be sufficient for tryRunGC() to get in
|
||||
// sync with storeBlock(). Most of the time they differ by some thousands of
|
||||
|
@ -310,7 +311,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
|
|||
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
|
||||
}
|
||||
if cfg.StateSyncInterval <= 0 {
|
||||
cfg.StateSyncInterval = defaultStateSyncInterval
|
||||
cfg.StateSyncInterval = DefaultStateSyncInterval
|
||||
log.Info("StateSyncInterval is not set or wrong, using default value",
|
||||
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
|
||||
}
|
||||
|
@ -320,7 +321,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
|
|||
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
|
||||
}
|
||||
if cfg.StateSyncInterval <= 0 {
|
||||
cfg.StateSyncInterval = defaultStateSyncInterval
|
||||
cfg.StateSyncInterval = DefaultStateSyncInterval
|
||||
log.Info("StateSyncInterval is not set or wrong, using default value",
|
||||
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
|
||||
}
|
||||
|
@ -1122,7 +1123,7 @@ func (bc *Blockchain) isHardforkEnabled(hf *config.Hardfork, blockHeight uint32)
|
|||
hfs := bc.config.Hardforks
|
||||
if hf != nil {
|
||||
start, ok := hfs[hf.String()]
|
||||
if !ok || start < blockHeight {
|
||||
if !ok || start > blockHeight {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -2849,7 +2850,7 @@ func (bc *Blockchain) PoolTxWithData(t *transaction.Transaction, data any, mp *m
|
|||
if verificationFunction != nil {
|
||||
err := verificationFunction(t, data)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("data verification failed: %w", err)
|
||||
}
|
||||
}
|
||||
return bc.verifyAndPoolTx(t, mp, feer, data)
|
||||
|
|
|
@ -35,6 +35,9 @@ const (
|
|||
DefaultBaseExecFee = 30
|
||||
// ContextNonceDataLen is a length of [Context.NonceData] in bytes.
|
||||
ContextNonceDataLen = 16
|
||||
// MaxNotificationCount is the maximum number of notifications per single
|
||||
// application execution.
|
||||
MaxNotificationCount = 512
|
||||
)
|
||||
|
||||
// Ledger is the interface to Blockchain required for Context functionality.
|
||||
|
@ -253,10 +256,10 @@ func (c *ContractMD) HFSpecificContractMD(hf *config.Hardfork) *HFSpecificContra
|
|||
}
|
||||
md, ok := c.mdCache[key]
|
||||
if !ok {
|
||||
panic(fmt.Errorf("native contract descriptor cache is not initialized: contract %s, hardfork %s", c.Hash.StringLE(), key))
|
||||
panic(fmt.Errorf("native contract descriptor cache is not initialized: contract %s, hardfork %s", c.Name, key))
|
||||
}
|
||||
if md == nil {
|
||||
panic(fmt.Errorf("native contract descriptor cache is nil: contract %s, hardfork %s", c.Hash.StringLE(), key))
|
||||
panic(fmt.Errorf("native contract descriptor cache is nil: contract %s, hardfork %s", c.Name, key))
|
||||
}
|
||||
return md
|
||||
}
|
||||
|
@ -565,10 +568,18 @@ func (ic *Context) IsHardforkActivation(hf config.Hardfork) bool {
|
|||
}
|
||||
|
||||
// AddNotification creates notification event and appends it to the notification list.
|
||||
func (ic *Context) AddNotification(hash util.Uint160, name string, item *stackitem.Array) {
|
||||
func (ic *Context) AddNotification(hash util.Uint160, name string, item *stackitem.Array) error {
|
||||
if ic.IsHardforkEnabled(config.HFEchidna) {
|
||||
// Do not check persisting triggers to avoid native persist failure. Do not check
|
||||
// verification trigger since verification context is loaded with ReadOnly flag.
|
||||
if ic.Trigger == trigger.Application && len(ic.Notifications) == MaxNotificationCount {
|
||||
return fmt.Errorf("notification count shouldn't exceed %d", MaxNotificationCount)
|
||||
}
|
||||
}
|
||||
ic.Notifications = append(ic.Notifications, state.NotificationEvent{
|
||||
ScriptHash: hash,
|
||||
Name: name,
|
||||
Item: item,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -114,8 +114,7 @@ func Notify(ic *interop.Context) error {
|
|||
if len(bytes) > MaxNotificationSize {
|
||||
return fmt.Errorf("notification size shouldn't exceed %d", MaxNotificationSize)
|
||||
}
|
||||
ic.AddNotification(curHash, name, stackitem.DeepCopy(stackitem.NewArray(args), true).(*stackitem.Array))
|
||||
return nil
|
||||
return ic.AddNotification(curHash, name, stackitem.DeepCopy(stackitem.NewArray(args), true).(*stackitem.Array))
|
||||
}
|
||||
|
||||
// LoadScript takes a script and arguments from the stack and loads it into the VM.
|
||||
|
|
|
@ -11,6 +11,14 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
)
|
||||
|
||||
// DummySTTempStoragePrefix is a dummy contract storage item prefix that may be
|
||||
// passed to Billet constructor when Billet's restoring functionality is not
|
||||
// used, i.e. for those situations when only traversal functionality is used.
|
||||
// Note that using this prefix for MPT restoring is an error which will lead to
|
||||
// a panic since Billet must have the ability to save contract storage items to
|
||||
// the underlying DB during MPT restore process.
|
||||
const DummySTTempStoragePrefix = 0x00
|
||||
|
||||
var (
|
||||
// ErrRestoreFailed is returned when replacing HashNode by its "unhashed"
|
||||
// candidate fails.
|
||||
|
@ -37,7 +45,10 @@ type Billet struct {
|
|||
// NewBillet returns a new billet for MPT trie restoring. It accepts a MemCachedStore
|
||||
// to decouple storage errors from logic errors so that all storage errors are
|
||||
// processed during `store.Persist()` at the caller. Another benefit is
|
||||
// that every `Put` can be considered an atomic operation.
|
||||
// that every `Put` can be considered an atomic operation. Note that mode
|
||||
// parameter must match precisely the Trie mode that is used in the underlying
|
||||
// DB to store the MPT nodes. Using wrong mode will lead to improper MPT nodes
|
||||
// decoding and even runtime panic.
|
||||
func NewBillet(rootHash util.Uint256, mode TrieMode, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet {
|
||||
return &Billet{
|
||||
TempStoragePrefix: prefix,
|
||||
|
@ -65,7 +76,7 @@ func (b *Billet) RestoreHashNode(path []byte, node Node) error {
|
|||
|
||||
// If it's a leaf, then put into temporary contract storage.
|
||||
if leaf, ok := node.(*LeafNode); ok {
|
||||
if b.TempStoragePrefix == 0 {
|
||||
if b.TempStoragePrefix == DummySTTempStoragePrefix {
|
||||
panic("invalid storage prefix")
|
||||
}
|
||||
k := append([]byte{byte(b.TempStoragePrefix)}, fromNibbles(path)...)
|
||||
|
|
|
@ -612,7 +612,7 @@ func (t *Trie) Find(prefix, from []byte, maxNum int) ([]storage.KeyValue, error)
|
|||
res []storage.KeyValue
|
||||
count int
|
||||
)
|
||||
b := NewBillet(t.root.Hash(), t.mode, 0, t.Store)
|
||||
b := NewBillet(t.root.Hash(), t.mode, DummySTTempStoragePrefix, t.Store)
|
||||
process := func(pathToNode []byte, node Node, _ []byte) bool {
|
||||
if leaf, ok := node.(*LeafNode); ok {
|
||||
if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually.
|
||||
|
|
|
@ -94,7 +94,7 @@ func (m *TrieStore) Seek(rng storage.SeekRange, f func(k, v []byte) bool) {
|
|||
}
|
||||
}
|
||||
|
||||
b := NewBillet(m.trie.root.Hash(), m.trie.mode, 0, m.trie.Store)
|
||||
b := NewBillet(m.trie.root.Hash(), m.trie.mode, DummySTTempStoragePrefix, m.trie.Store)
|
||||
process := func(pathToNode []byte, node Node, _ []byte) bool {
|
||||
if leaf, ok := node.(*LeafNode); ok {
|
||||
// (*Billet).traverse includes `from` path into the result if so. It's OK for Seek, so shouldn't be filtered out.
|
||||
|
|
|
@ -432,8 +432,8 @@ func (s *Designate) DesignateAsRole(ic *interop.Context, r noderoles.Role, pubs
|
|||
ntf.Append(pubsToArray(old))
|
||||
ntf.Append(pubsToArray(pubs))
|
||||
}
|
||||
ic.AddNotification(s.Hash, DesignationEventName, ntf)
|
||||
return nil
|
||||
|
||||
return ic.AddNotification(s.Hash, DesignationEventName, ntf)
|
||||
}
|
||||
|
||||
func (s *Designate) getRole(item stackitem.Item) (noderoles.Role, bool) {
|
||||
|
|
|
@ -372,7 +372,10 @@ func (m *Management) deployWithData(ic *interop.Context, args []stackitem.Item)
|
|||
panic(err)
|
||||
}
|
||||
m.callDeploy(ic, newcontract, args[2], false)
|
||||
m.emitNotification(ic, contractDeployNotificationName, newcontract.Hash)
|
||||
err = m.emitNotification(ic, contractDeployNotificationName, newcontract.Hash)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return contractToStack(newcontract)
|
||||
}
|
||||
|
||||
|
@ -444,7 +447,10 @@ func (m *Management) updateWithData(ic *interop.Context, args []stackitem.Item)
|
|||
panic(err)
|
||||
}
|
||||
m.callDeploy(ic, contract, args[2], true)
|
||||
m.emitNotification(ic, contractUpdateNotificationName, contract.Hash)
|
||||
err = m.emitNotification(ic, contractUpdateNotificationName, contract.Hash)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return stackitem.Null{}
|
||||
}
|
||||
|
||||
|
@ -497,7 +503,10 @@ func (m *Management) destroy(ic *interop.Context, sis []stackitem.Item) stackite
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m.emitNotification(ic, contractDestroyNotificationName, hash)
|
||||
err = m.emitNotification(ic, contractDestroyNotificationName, hash)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return stackitem.Null{}
|
||||
}
|
||||
|
||||
|
@ -610,8 +619,6 @@ func (m *Management) OnPersist(ic *interop.Context) error {
|
|||
for _, hf := range config.Hardforks {
|
||||
if _, ok := activeHFs[hf]; ok && ic.IsHardforkActivation(hf) {
|
||||
isUpdate = true
|
||||
activation := hf // avoid loop variable pointer exporting.
|
||||
activeIn = &activation // reuse ActiveIn variable for the initialization hardfork.
|
||||
// Break immediately since native Initialize should be called starting from the first hardfork in a raw
|
||||
// (if there are multiple hardforks with the same enabling height).
|
||||
break
|
||||
|
@ -626,6 +633,10 @@ func (m *Management) OnPersist(ic *interop.Context) error {
|
|||
currentActiveHFs = append(currentActiveHFs, hf)
|
||||
}
|
||||
}
|
||||
// activeIn is not included into the activeHFs list.
|
||||
if activeIn != nil && activeIn.Cmp(latestHF) > 0 {
|
||||
latestHF = *activeIn
|
||||
}
|
||||
if !(isDeploy || isUpdate) {
|
||||
continue
|
||||
}
|
||||
|
@ -668,7 +679,7 @@ func (m *Management) OnPersist(ic *interop.Context) error {
|
|||
// The rest of activating hardforks also require initialization.
|
||||
for _, hf := range currentActiveHFs {
|
||||
if err := native.Initialize(ic, &hf, hfSpecificMD); err != nil {
|
||||
return fmt.Errorf("initializing %s native contract at HF %d: %w", md.Name, activeIn, err)
|
||||
return fmt.Errorf("initializing %s native contract at HF %s: %w", md.Name, hf, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -681,7 +692,10 @@ func (m *Management) OnPersist(ic *interop.Context) error {
|
|||
if isUpdate {
|
||||
ntfName = contractUpdateNotificationName
|
||||
}
|
||||
m.emitNotification(ic, ntfName, cs.Hash)
|
||||
err = m.emitNotification(ic, ntfName, cs.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -806,8 +820,8 @@ func (m *Management) getNextContractID(d *dao.Simple) (int32, error) {
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (m *Management) emitNotification(ic *interop.Context, name string, hash util.Uint160) {
|
||||
ic.AddNotification(m.Hash, name, stackitem.NewArray([]stackitem.Item{addrToStackItem(&hash)}))
|
||||
func (m *Management) emitNotification(ic *interop.Context, name string, hash util.Uint160) error {
|
||||
return ic.AddNotification(m.Hash, name, stackitem.NewArray([]stackitem.Item{addrToStackItem(&hash)}))
|
||||
}
|
||||
|
||||
func checkScriptAndMethods(ic *interop.Context, script []byte, methods []manifest.Method) error {
|
||||
|
|
|
@ -479,9 +479,12 @@ func (n *NEO) OnPersist(ic *interop.Context) error {
|
|||
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cache.committee.Bytes(ic.DAO.GetItemCtx()))
|
||||
|
||||
if oldCommittee != nil {
|
||||
ic.AddNotification(n.Hash, "CommitteeChanged", stackitem.NewArray([]stackitem.Item{
|
||||
err := ic.AddNotification(n.Hash, "CommitteeChanged", stackitem.NewArray([]stackitem.Item{
|
||||
oldCommittee, newCommittee,
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -883,7 +886,8 @@ func (n *NEO) onNEP17Payment(ic *interop.Context, args []stackitem.Item) stackit
|
|||
return stackitem.Null{}
|
||||
}
|
||||
|
||||
// RegisterCandidateInternal registers pub as a new candidate.
|
||||
// RegisterCandidateInternal registers pub as a new candidate. This method must not be
|
||||
// called outside of VM since it panics on critical errors.
|
||||
func (n *NEO) RegisterCandidateInternal(ic *interop.Context, pub *keys.PublicKey) error {
|
||||
var emitEvent = true
|
||||
|
||||
|
@ -898,16 +902,23 @@ func (n *NEO) RegisterCandidateInternal(ic *interop.Context, pub *keys.PublicKey
|
|||
c.Registered = true
|
||||
}
|
||||
err := putConvertibleToDAO(n.ID, ic.DAO, key, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if emitEvent {
|
||||
cache := ic.DAO.GetRWCache(n.ID).(*NeoCache)
|
||||
cache.votesChanged = true
|
||||
ic.AddNotification(n.Hash, "CandidateStateChanged", stackitem.NewArray([]stackitem.Item{
|
||||
err = ic.AddNotification(n.Hash, "CandidateStateChanged", stackitem.NewArray([]stackitem.Item{
|
||||
stackitem.NewByteArray(pub.Bytes()),
|
||||
stackitem.NewBool(c.Registered),
|
||||
stackitem.NewBigInteger(&c.Votes),
|
||||
}))
|
||||
if err != nil {
|
||||
// Panic since it's a critical error that must abort execution.
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NEO) unregisterCandidate(ic *interop.Context, args []stackitem.Item) stackitem.Item {
|
||||
|
@ -922,7 +933,8 @@ func (n *NEO) unregisterCandidate(ic *interop.Context, args []stackitem.Item) st
|
|||
return stackitem.NewBool(err == nil)
|
||||
}
|
||||
|
||||
// UnregisterCandidateInternal unregisters pub as a candidate.
|
||||
// UnregisterCandidateInternal unregisters pub as a candidate. This method must not be
|
||||
// called outside of VM since it panics on critical errors.
|
||||
func (n *NEO) UnregisterCandidateInternal(ic *interop.Context, pub *keys.PublicKey) error {
|
||||
var err error
|
||||
|
||||
|
@ -942,14 +954,21 @@ func (n *NEO) UnregisterCandidateInternal(ic *interop.Context, pub *keys.PublicK
|
|||
if !ok {
|
||||
err = putConvertibleToDAO(n.ID, ic.DAO, key, c)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if emitEvent {
|
||||
ic.AddNotification(n.Hash, "CandidateStateChanged", stackitem.NewArray([]stackitem.Item{
|
||||
err := ic.AddNotification(n.Hash, "CandidateStateChanged", stackitem.NewArray([]stackitem.Item{
|
||||
stackitem.NewByteArray(pub.Bytes()),
|
||||
stackitem.NewBool(c.Registered),
|
||||
stackitem.NewBigInteger(&c.Votes),
|
||||
}))
|
||||
if err != nil {
|
||||
// Panic since it's a critical error that must abort execution.
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NEO) vote(ic *interop.Context, args []stackitem.Item) stackitem.Item {
|
||||
|
@ -962,7 +981,8 @@ func (n *NEO) vote(ic *interop.Context, args []stackitem.Item) stackitem.Item {
|
|||
return stackitem.NewBool(err == nil)
|
||||
}
|
||||
|
||||
// VoteInternal votes from account h for validarors specified in pubs.
|
||||
// VoteInternal votes from account h for validarors specified in pubs. This method
|
||||
// must not be called outside of VM since it panics on critical errors.
|
||||
func (n *NEO) VoteInternal(ic *interop.Context, h util.Uint160, pub *keys.PublicKey) error {
|
||||
ok, err := runtime.CheckHashedWitness(ic, h)
|
||||
if err != nil {
|
||||
|
@ -1024,12 +1044,16 @@ func (n *NEO) VoteInternal(ic *interop.Context, h util.Uint160, pub *keys.Public
|
|||
}
|
||||
ic.DAO.PutStorageItem(n.ID, key, acc.Bytes(ic.DAO.GetItemCtx()))
|
||||
|
||||
ic.AddNotification(n.Hash, "Vote", stackitem.NewArray([]stackitem.Item{
|
||||
err = ic.AddNotification(n.Hash, "Vote", stackitem.NewArray([]stackitem.Item{
|
||||
stackitem.NewByteArray(h.BytesBE()),
|
||||
keyToStackItem(oldVote),
|
||||
keyToStackItem(pub),
|
||||
stackitem.NewBigInteger(&acc.Balance),
|
||||
}))
|
||||
if err != nil {
|
||||
// Panic since it's a critical error that must abort execution.
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if newGas != nil { // Can be if it was already distributed in the same block.
|
||||
n.GAS.mint(ic, h, newGas, true)
|
||||
|
|
|
@ -146,7 +146,11 @@ func (c *nep17TokenNative) postTransfer(ic *interop.Context, from, to *util.Uint
|
|||
}
|
||||
}
|
||||
}()
|
||||
c.emitTransfer(ic, from, to, amount)
|
||||
err := c.emitTransfer(ic, from, to, amount)
|
||||
if err != nil {
|
||||
skipPostCalls = true
|
||||
panic(err)
|
||||
}
|
||||
if to == nil || !callOnPayment {
|
||||
return
|
||||
}
|
||||
|
@ -170,8 +174,8 @@ func (c *nep17TokenNative) postTransfer(ic *interop.Context, from, to *util.Uint
|
|||
}
|
||||
}
|
||||
|
||||
func (c *nep17TokenNative) emitTransfer(ic *interop.Context, from, to *util.Uint160, amount *big.Int) {
|
||||
ic.AddNotification(c.Hash, "Transfer", stackitem.NewArray([]stackitem.Item{
|
||||
func (c *nep17TokenNative) emitTransfer(ic *interop.Context, from, to *util.Uint160, amount *big.Int) error {
|
||||
return ic.AddNotification(c.Hash, "Transfer", stackitem.NewArray([]stackitem.Item{
|
||||
addrToStackItem(from),
|
||||
addrToStackItem(to),
|
||||
stackitem.NewBigInteger(amount),
|
||||
|
|
|
@ -316,10 +316,13 @@ func (o *Oracle) FinishInternal(ic *interop.Context) error {
|
|||
if err != nil {
|
||||
return ErrRequestNotFound
|
||||
}
|
||||
ic.AddNotification(o.Hash, "OracleResponse", stackitem.NewArray([]stackitem.Item{
|
||||
err = ic.AddNotification(o.Hash, "OracleResponse", stackitem.NewArray([]stackitem.Item{
|
||||
stackitem.Make(resp.ID),
|
||||
stackitem.Make(req.OriginalTxID.BytesBE()),
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
origTx, _, err := ic.DAO.GetTransaction(req.OriginalTxID)
|
||||
if err != nil {
|
||||
|
@ -422,12 +425,15 @@ func (o *Oracle) RequestInternal(ic *interop.Context, url string, filter *string
|
|||
} else {
|
||||
filterNotif = stackitem.Null{}
|
||||
}
|
||||
ic.AddNotification(o.Hash, "OracleRequest", stackitem.NewArray([]stackitem.Item{
|
||||
err = ic.AddNotification(o.Hash, "OracleRequest", stackitem.NewArray([]stackitem.Item{
|
||||
stackitem.Make(id),
|
||||
stackitem.Make(ic.VM.GetCallingScriptHash().BytesBE()),
|
||||
stackitem.Make(url),
|
||||
filterNotif,
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req := &state.OracleRequest{
|
||||
OriginalTxID: o.getOriginalTxID(ic.DAO, ic.Tx),
|
||||
GasForResponse: gas.Uint64(),
|
||||
|
|
|
@ -551,7 +551,7 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt
|
|||
if s.bc.GetConfig().Ledger.KeepOnlyLatestState || s.bc.GetConfig().Ledger.RemoveUntraceableBlocks {
|
||||
mode |= mpt.ModeLatest
|
||||
}
|
||||
b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store))
|
||||
b := mpt.NewBillet(root, mode, mpt.DummySTTempStoragePrefix, storage.NewMemCachedStore(s.dao.Store))
|
||||
return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
|
||||
return process(node, nodeBytes)
|
||||
}, false)
|
||||
|
|
|
@ -37,7 +37,7 @@ const (
|
|||
WitnessCalledByGroup WitnessConditionType = 0x29 // CalledByGroup
|
||||
|
||||
// MaxConditionNesting limits the maximum allowed level of condition nesting.
|
||||
MaxConditionNesting = 2
|
||||
MaxConditionNesting = 3
|
||||
)
|
||||
|
||||
// WitnessCondition is a condition of WitnessRule.
|
||||
|
@ -564,7 +564,7 @@ func DecodeBinaryCondition(r *io.BinReader) WitnessCondition {
|
|||
}
|
||||
|
||||
func decodeBinaryCondition(r *io.BinReader, maxDepth int) WitnessCondition {
|
||||
if maxDepth < 0 {
|
||||
if maxDepth <= 0 {
|
||||
r.Err = errors.New("too many nesting levels")
|
||||
return nil
|
||||
}
|
||||
|
@ -629,7 +629,7 @@ func UnmarshalConditionJSON(data []byte) (WitnessCondition, error) {
|
|||
}
|
||||
|
||||
func unmarshalConditionJSON(data []byte, maxDepth int) (WitnessCondition, error) {
|
||||
if maxDepth < 0 {
|
||||
if maxDepth <= 0 {
|
||||
return nil, errors.New("too many nesting levels")
|
||||
}
|
||||
aux := &conditionAux{}
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -39,3 +41,14 @@ func TestGetConsensusAddressMainNet(t *testing.T) {
|
|||
assert.Equal(t, consensusScript, script.String())
|
||||
assert.Equal(t, consensusAddr, address.Uint160ToString(script))
|
||||
}
|
||||
|
||||
func TestGetExpectedHeaderSize(t *testing.T) {
|
||||
cfg, err := config.Load("../../config", netmode.MainNet)
|
||||
require.NoError(t, err)
|
||||
blk, err := CreateGenesisBlock(cfg.ProtocolConfiguration)
|
||||
require.NoError(t, err)
|
||||
w := io.NewBufBinWriter()
|
||||
blk.Header.EncodeBinary(w.BinWriter)
|
||||
require.NoError(t, w.Err)
|
||||
require.Equal(t, block.GetExpectedHeaderSize(false, 0), w.Len())
|
||||
}
|
||||
|
|
|
@ -1325,7 +1325,11 @@ func (s *Server) RelayP2PNotaryRequest(r *payload.P2PNotaryRequest) error {
|
|||
|
||||
// verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool.
|
||||
func (s *Server) verifyAndPoolNotaryRequest(r *payload.P2PNotaryRequest) error {
|
||||
return s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.notaryFeer, s.verifyNotaryRequest)
|
||||
err := s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.notaryFeer, s.verifyNotaryRequest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add fallback transaction to the notary pool: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification.
|
||||
|
|
|
@ -974,9 +974,15 @@ func (c *Client) GetRawNotaryPool() (*result.RawNotaryPool, error) {
|
|||
}
|
||||
|
||||
// GetBlockNotifications returns notifications from a block organized by trigger type.
|
||||
func (c *Client) GetBlockNotifications(blockHash util.Uint256, filters ...*neorpc.NotificationFilter) (*result.BlockNotifications, error) {
|
||||
var resp = &result.BlockNotifications{}
|
||||
if err := c.performRequest("getblocknotifications", []any{blockHash.StringLE(), filters}, resp); err != nil {
|
||||
func (c *Client) GetBlockNotifications(blockHash util.Uint256, filter *neorpc.NotificationFilter) (*result.BlockNotifications, error) {
|
||||
var (
|
||||
resp = &result.BlockNotifications{}
|
||||
params = []any{blockHash.StringLE()}
|
||||
)
|
||||
if filter != nil {
|
||||
params = append(params, *filter)
|
||||
}
|
||||
if err := c.performRequest("getblocknotifications", params, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
|
|
|
@ -1335,6 +1335,72 @@ var rpcClientTestCases = map[string][]rpcClientTestCase{
|
|||
},
|
||||
},
|
||||
},
|
||||
"getblocknotifications": {
|
||||
{
|
||||
name: "positive, nil filter",
|
||||
invoke: func(c *Client) (any, error) {
|
||||
hash, err := util.Uint256DecodeStringLE("0f8fb4e17d2ab9f3097af75ca7fd16064160fb8043db94909e00dd4e257b9dc4")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.GetBlockNotifications(hash, nil)
|
||||
},
|
||||
serverResponse: `{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"result": {
|
||||
"onpersist": [],
|
||||
"application": [
|
||||
{
|
||||
"container": "0x0f8fb4e17d2ab9f3097af75ca7fd16064160fb8043db94909e00dd4e257b9dc4",
|
||||
"contract": "0xfffdc93764dbaddd97c48f252a53ea4643faa3fd",
|
||||
"eventname": "Deploy",
|
||||
"state": {
|
||||
"type": "Array",
|
||||
"value": [
|
||||
{
|
||||
"type": "ByteString",
|
||||
"value": "/aP6Q0bqUyolj8SX3a3bZDfJ/f8="
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
"postpersist": []
|
||||
}
|
||||
}`,
|
||||
result: func(c *Client) any {
|
||||
blockHash, _ := util.Uint256DecodeStringLE("0f8fb4e17d2ab9f3097af75ca7fd16064160fb8043db94909e00dd4e257b9dc4")
|
||||
contract, _ := util.Uint160DecodeStringBE("fffdc93764dbaddd97c48f252a53ea4643faa3fd")
|
||||
expectedBytes, _ := base64.StdEncoding.DecodeString("/aP6Q0bqUyolj8SX3a3bZDfJ/f8=")
|
||||
return &result.BlockNotifications{
|
||||
OnPersist: []state.ContainedNotificationEvent{},
|
||||
Application: []state.ContainedNotificationEvent{
|
||||
{
|
||||
Container: blockHash,
|
||||
NotificationEvent: state.NotificationEvent{
|
||||
ScriptHash: contract,
|
||||
Name: "Deploy",
|
||||
Item: stackitem.NewArray([]stackitem.Item{stackitem.NewByteArray(expectedBytes)}),
|
||||
},
|
||||
},
|
||||
},
|
||||
PostPersist: []state.ContainedNotificationEvent{},
|
||||
}
|
||||
},
|
||||
check: func(t *testing.T, c *Client, res any) {
|
||||
bn, ok := res.(*result.BlockNotifications)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, bn)
|
||||
require.Len(t, bn.Application, 1)
|
||||
n := bn.Application[0]
|
||||
require.Equal(t, "Deploy", n.Name)
|
||||
require.Equal(t, "0f8fb4e17d2ab9f3097af75ca7fd16064160fb8043db94909e00dd4e257b9dc4", n.Container.StringLE())
|
||||
require.Equal(t, "fffdc93764dbaddd97c48f252a53ea4643faa3fd", n.ScriptHash.StringLE())
|
||||
require.Len(t, n.Item.Value().([]stackitem.Item), 1)
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type rpcClientErrorCase struct {
|
||||
|
@ -1859,6 +1925,12 @@ var rpcClientErrorCases = map[string][]rpcClientErrorCase{
|
|||
return nil, c.ValidateAddress("")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "getblocknotifications_unmarshalling_error",
|
||||
invoke: func(c *Client) (any, error) {
|
||||
return c.GetBlockNotifications(util.Uint256{}, nil)
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -51,17 +51,6 @@ type Ledger interface {
|
|||
HeaderHeight() uint32
|
||||
}
|
||||
|
||||
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
||||
type poolWrapper struct {
|
||||
*pool.Pool
|
||||
}
|
||||
|
||||
// Close closes the pool and returns nil.
|
||||
func (p poolWrapper) Close() error {
|
||||
p.Pool.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
type indexedOID struct {
|
||||
Index int
|
||||
OID oid.ID
|
||||
|
@ -81,7 +70,7 @@ type Service struct {
|
|||
headerSizeMap map[int]int
|
||||
|
||||
chain Ledger
|
||||
pool poolWrapper
|
||||
pool neofs.PoolWrapper
|
||||
enqueue func(obj bqueue.Indexable) error
|
||||
account *wallet.Account
|
||||
|
||||
|
@ -166,7 +155,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
|
|||
}
|
||||
return &Service{
|
||||
chain: chain,
|
||||
pool: poolWrapper{Pool: p},
|
||||
pool: neofs.PoolWrapper{Pool: p},
|
||||
log: logger,
|
||||
cfg: cfg,
|
||||
operationMode: opt,
|
||||
|
@ -192,7 +181,8 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
|
|||
|
||||
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
|
||||
headerSizeMap := make(map[int]int)
|
||||
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
|
||||
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, 0) // genesis header size.
|
||||
headerSizeMap[1] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
|
||||
for height := range chain.CommitteeHistory {
|
||||
headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height))
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package neofs
|
|||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
||||
)
|
||||
|
||||
// Constants related to NeoFS block storage.
|
||||
|
@ -16,6 +18,8 @@ const (
|
|||
DefaultBlockAttribute = "Block"
|
||||
// DefaultIndexFileAttribute is the default attribute name for index file objects.
|
||||
DefaultIndexFileAttribute = "Index"
|
||||
// DefaultStateAttribute is the default attribute name for state objects.
|
||||
DefaultStateAttribute = "State"
|
||||
|
||||
// DefaultSearchBatchSize is a number of objects to search in a batch. We need to
|
||||
// search with EQ filter to avoid partially-completed SEARCH responses. If EQ search
|
||||
|
@ -50,3 +54,14 @@ const (
|
|||
// MaxBackoff is the maximum backoff duration.
|
||||
MaxBackoff = 20 * time.Second
|
||||
)
|
||||
|
||||
// PoolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
||||
type PoolWrapper struct {
|
||||
*pool.Pool
|
||||
}
|
||||
|
||||
// Close closes the pool and returns nil.
|
||||
func (p PoolWrapper) Close() error {
|
||||
p.Pool.Close()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -333,7 +333,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
|
|||
}
|
||||
if r.isMainCompleted() && r.minNotValidBefore > n.Config.Chain.BlockHeight() {
|
||||
if err := n.finalize(acc, r.main, payload.MainTransaction.Hash()); err != nil {
|
||||
n.Config.Log.Error("failed to finalize main transaction",
|
||||
n.Config.Log.Error("failed to finalize main transaction, waiting for the next block to retry",
|
||||
zap.String("hash", r.main.Hash().StringLE()),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
@ -381,7 +381,9 @@ func (n *Notary) PostPersist() {
|
|||
for h, r := range n.requests {
|
||||
if !r.isSent && r.isMainCompleted() && r.minNotValidBefore > currHeight {
|
||||
if err := n.finalize(acc, r.main, h); err != nil {
|
||||
n.Config.Log.Error("failed to finalize main transaction", zap.Error(err))
|
||||
n.Config.Log.Error("failed to finalize main transaction after PostPersist, waiting for the next block to retry",
|
||||
zap.String("hash", r.main.Hash().StringLE()),
|
||||
zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -389,7 +391,10 @@ func (n *Notary) PostPersist() {
|
|||
for _, fb := range r.fallbacks {
|
||||
if nvb := fb.GetAttributes(transaction.NotValidBeforeT)[0].Value.(*transaction.NotValidBefore).Height; nvb <= currHeight {
|
||||
// Ignore the error, wait for the next block to resend them
|
||||
_ = n.finalize(acc, fb, h)
|
||||
err := n.finalize(acc, fb, h)
|
||||
n.Config.Log.Error("failed to finalize fallback transaction, waiting for the next block to retry",
|
||||
zap.String("hash", fb.Hash().StringLE()),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -413,7 +418,10 @@ func (n *Notary) finalize(acc *wallet.Account, tx *transaction.Transaction, h ut
|
|||
return fmt.Errorf("failed to update completed transaction's size: %w", err)
|
||||
}
|
||||
|
||||
n.pushNewTx(newTx, h)
|
||||
err = n.pushNewTx(newTx, h)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to enqueue completed transaction: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -423,11 +431,13 @@ type txHashPair struct {
|
|||
mainHash util.Uint256
|
||||
}
|
||||
|
||||
func (n *Notary) pushNewTx(tx *transaction.Transaction, h util.Uint256) {
|
||||
func (n *Notary) pushNewTx(tx *transaction.Transaction, h util.Uint256) error {
|
||||
select {
|
||||
case n.newTxs <- txHashPair{tx, h}:
|
||||
default:
|
||||
return errors.New("transaction queue is full")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Notary) newTxCallbackLoop() {
|
||||
|
|
|
@ -2488,3 +2488,38 @@ func TestClient_NEP24(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetBlockNotifications(t *testing.T) {
|
||||
chain, _, httpSrv := initServerWithInMemoryChain(t)
|
||||
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(c.Close)
|
||||
require.NoError(t, c.Init())
|
||||
|
||||
t.Run("nil filter", func(t *testing.T) {
|
||||
bn, err := c.GetBlockNotifications(chain.GetHeaderHash(1), nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, bn)
|
||||
})
|
||||
t.Run("empty filter", func(t *testing.T) {
|
||||
bn, err := c.GetBlockNotifications(chain.GetHeaderHash(1), &neorpc.NotificationFilter{})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, bn)
|
||||
})
|
||||
t.Run("bad filter", func(t *testing.T) {
|
||||
badName := "Transfer1"
|
||||
bn, err := c.GetBlockNotifications(chain.GetHeaderHash(1), &neorpc.NotificationFilter{
|
||||
Name: &badName,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, bn)
|
||||
})
|
||||
t.Run("good", func(t *testing.T) {
|
||||
name := "Transfer"
|
||||
bn, err := c.GetBlockNotifications(chain.GetHeaderHash(1), &neorpc.NotificationFilter{
|
||||
Name: &name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, bn)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -311,6 +311,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
|
|||
conf.MaxWebSocketClients = defaultMaxWebSocketClients
|
||||
log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients))
|
||||
}
|
||||
if conf.MaxWebSocketFeeds == 0 {
|
||||
conf.MaxWebSocketFeeds = defaultMaxFeeds
|
||||
log.Info("MaxWebSocketFeeds is not set or wrong, setting default value", zap.Int("MaxWebSocketFeeds", defaultMaxFeeds))
|
||||
}
|
||||
var oracleWrapped = new(atomic.Value)
|
||||
if orc != nil {
|
||||
oracleWrapped.Store(orc)
|
||||
|
@ -522,7 +526,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
}
|
||||
resChan := make(chan abstractResult) // response.abstract or response.abstractBatch
|
||||
subChan := make(chan intEvent, notificationBufSize)
|
||||
subscr := &subscriber{writer: subChan}
|
||||
subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)}
|
||||
s.subsLock.Lock()
|
||||
s.subscribers[subscr] = true
|
||||
s.subsLock.Unlock()
|
||||
|
@ -560,7 +564,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
// RegisterLocal performs local client registration.
|
||||
func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) {
|
||||
subChan := make(chan intEvent, notificationBufSize)
|
||||
subscr := &subscriber{writer: subChan}
|
||||
subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)}
|
||||
s.subsLock.Lock()
|
||||
s.subscribers[subscr] = true
|
||||
s.subsLock.Unlock()
|
||||
|
|
|
@ -2324,6 +2324,12 @@ var rpcTestCases = map[string][]rpcTestCase{
|
|||
fail: true,
|
||||
errCode: neorpc.InvalidParamsCode,
|
||||
},
|
||||
{
|
||||
name: "empty filter",
|
||||
params: `["` + genesisBlockHash + `", []]`,
|
||||
fail: true,
|
||||
errCode: neorpc.InvalidParamsCode,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ type (
|
|||
// cheaper doing it this way rather than creating a map),
|
||||
// pointing to an EventID is an obvious overkill at the moment, but
|
||||
// that's not for long.
|
||||
feeds [maxFeeds]feed
|
||||
feeds []feed
|
||||
}
|
||||
// feed stores subscriber's desired event ID with filter.
|
||||
feed struct {
|
||||
|
@ -43,8 +43,8 @@ func (f feed) Filter() neorpc.SubscriptionFilter {
|
|||
}
|
||||
|
||||
const (
|
||||
// Maximum number of subscriptions per one client.
|
||||
maxFeeds = 16
|
||||
// The default maximum number of subscriptions per one client.
|
||||
defaultMaxFeeds = 16
|
||||
|
||||
// This sets notification messages buffer depth. It may seem to be quite
|
||||
// big, but there is a big gap in speed between internal event processing
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -76,7 +77,11 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
|
|||
|
||||
func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
|
||||
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
||||
ws, respMsgs := initWSClient(t, httpSrv, rpcSrv, startNetworkServer...)
|
||||
return chain, rpcSrv, ws, respMsgs
|
||||
}
|
||||
|
||||
func initWSClient(t *testing.T, httpSrv *httptest.Server, rpcSrv *Server, startNetworkServer ...bool) (*websocket.Conn, chan []byte) {
|
||||
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||
ws, r, err := dialer.Dial(url, nil)
|
||||
|
@ -108,7 +113,7 @@ func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core
|
|||
rpcSrv.coreServer.Shutdown()
|
||||
}
|
||||
})
|
||||
return chain, rpcSrv, ws, respMsgs
|
||||
return ws, respMsgs
|
||||
}
|
||||
|
||||
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
|
||||
|
@ -577,9 +582,10 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
|
|||
callUnsubscribe(t, c, respMsgs, headerSubID)
|
||||
}
|
||||
|
||||
func TestMaxSubscriptions(t *testing.T) {
|
||||
func testMaxSubscriptions(t *testing.T, f func(*config.Config), maxFeeds int) {
|
||||
var subIDs = make([]string, 0)
|
||||
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||
_, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, f)
|
||||
c, respMsgs := initWSClient(t, httpSrv, rpcSrv)
|
||||
|
||||
for i := range maxFeeds + 1 {
|
||||
var s string
|
||||
|
@ -600,6 +606,17 @@ func TestMaxSubscriptions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMaxSubscriptions(t *testing.T) {
|
||||
t.Run("default", func(t *testing.T) {
|
||||
testMaxSubscriptions(t, nil, defaultMaxFeeds)
|
||||
})
|
||||
t.Run("maxfeeds=x2", func(t *testing.T) {
|
||||
testMaxSubscriptions(t, func(c *config.Config) {
|
||||
c.ApplicationConfiguration.RPC.MaxWebSocketFeeds = defaultMaxFeeds * 2
|
||||
}, defaultMaxFeeds*2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBadSubUnsub(t *testing.T) {
|
||||
var subCases = map[string]string{
|
||||
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,
|
||||
|
|
Loading…
Add table
Reference in a new issue