From 9c0274850a72635ca55cdf1d3cf7ace406bda5b2 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Tue, 4 Mar 2025 16:19:30 +0800 Subject: [PATCH 1/2] cli: refactor NeoFS related flags and functions Signed-off-by: Ekaterina Pavlova --- cli/options/options.go | 49 ++++++++++++- cli/util/convert.go | 71 ++++++++---------- cli/util/upload_bin.go | 84 +++++++++------------- cli/util/util_test.go | 2 - pkg/services/blockfetcher/blockfetcher.go | 15 +--- pkg/services/helpers/neofs/blockstorage.go | 13 ++++ 6 files changed, 126 insertions(+), 108 deletions(-) diff --git a/cli/options/options.go b/cli/options/options.go index 12c5db701..b4e3d5267 100644 --- a/cli/options/options.go +++ b/cli/options/options.go @@ -25,8 +25,11 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/urfave/cli/v2" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -43,9 +46,13 @@ const ( DefaultAwaitableTimeout = 3 * 15 * time.Second ) -// RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to -// check for flag presence in the context. -const RPCEndpointFlag = "rpc-endpoint" +const ( + // RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to + // check for flag presence in the context. + RPCEndpointFlag = "rpc-endpoint" + // NeoFSRPCEndpointFlag is a long flag name for a NeoFS RPC endpoint. + NeoFSRPCEndpointFlag = "fs-rpc-endpoint" +) // Wallet is a set of flags used for wallet operations. var Wallet = []cli.Flag{ @@ -101,6 +108,22 @@ var RPC = []cli.Flag{ }, } +// NeoFSRPC is a set of flags used for NeoFS RPC connections (endpoint). +var NeoFSRPC = []cli.Flag{&cli.StringSliceFlag{ + Name: NeoFSRPCEndpointFlag, + Aliases: []string{"fsr"}, + Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)", + Required: true, + Action: func(ctx *cli.Context, fsRpcEndpoints []string) error { + for _, endpoint := range fsRpcEndpoints { + if endpoint == "" { + return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1) + } + } + return nil + }, +}} + // Historic is a flag for commands that can perform historic invocations. var Historic = &cli.StringFlag{ Name: "historic", @@ -180,6 +203,26 @@ func GetRPCClient(gctx context.Context, ctx *cli.Context) (*rpcclient.Client, cl return c, nil } +// GetNeoFSClientPool returns a NeoFS pool and a signer for the given Context. +func GetNeoFSClientPool(ctx *cli.Context, acc *wallet.Account) (user.Signer, neofs.PoolWrapper, error) { + rpcNeoFS := ctx.StringSlice(NeoFSRPCEndpointFlag) + signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey) + + params := pool.DefaultOptions() + params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) + params.SetNodeDialTimeout(neofs.DefaultDialTimeout) + params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) + p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params) + if err != nil { + return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to create NeoFS pool: %w", err) + } + pWrapper := neofs.PoolWrapper{Pool: p} + if err = pWrapper.Dial(context.Background()); err != nil { + return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to dial NeoFS pool: %w", err) + } + return signer, pWrapper, nil +} + // GetInvoker returns an invoker using the given RPC client, context and signers. // It parses "--historic" parameter to adjust it. func GetInvoker(c *rpcclient.Client, ctx *cli.Context, signers []transaction.Signer) (*invoker.Invoker, cli.ExitCoder) { diff --git a/cli/util/convert.go b/cli/util/convert.go index d2fe4d63c..763aaf9b3 100644 --- a/cli/util/convert.go +++ b/cli/util/convert.go @@ -16,6 +16,35 @@ import ( "github.com/urfave/cli/v2" ) +var neoFSFlags = append([]cli.Flag{ + &cli.StringFlag{ + Name: "container", + Aliases: []string{"cid"}, + Usage: "NeoFS container ID to upload objects to", + Required: true, + Action: cmdargs.EnsureNotEmpty("container"), + }, + &flags.AddressFlag{ + Name: "address", + Usage: "Address to use for signing the uploading and searching transactions in NeoFS", + }, + &cli.UintFlag{ + Name: "retries", + Usage: "Maximum number of NeoFS node request retries", + Value: neofs.MaxRetries, + Action: func(context *cli.Context, u uint) error { + if u < 1 { + return cli.Exit("retries should be greater than 0", 1) + } + return nil + }, + }, + &cli.UintFlag{ + Name: "searchers", + Usage: "Number of concurrent searches for objects", + Value: 100, + }}, options.NeoFSRPC...) + // NewCommands returns util commands for neo-go CLI. func NewCommands() []*cli.Command { // By default, RPC flag is required. sendtx and txdump may be called without provided rpc-endpoint. @@ -35,27 +64,6 @@ func NewCommands() []*cli.Command { }, options.RPC...) txCancelFlags = append(txCancelFlags, options.Wallet...) uploadBinFlags := append([]cli.Flag{ - &cli.StringSliceFlag{ - Name: "fs-rpc-endpoint", - Aliases: []string{"fsr"}, - Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)", - Required: true, - Action: func(ctx *cli.Context, fsRpcEndpoints []string) error { - for _, endpoint := range fsRpcEndpoints { - if endpoint == "" { - return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1) - } - } - return nil - }, - }, - &cli.StringFlag{ - Name: "container", - Aliases: []string{"cid"}, - Usage: "NeoFS container ID to upload blocks to", - Required: true, - Action: cmdargs.EnsureNotEmpty("container"), - }, &cli.StringFlag{ Name: "block-attribute", Usage: "Attribute key of the block object", @@ -68,10 +76,6 @@ func NewCommands() []*cli.Command { Value: neofs.DefaultIndexFileAttribute, Action: cmdargs.EnsureNotEmpty("index-attribute"), }, - &flags.AddressFlag{ - Name: "address", - Usage: "Address to use for signing the uploading and searching transactions in NeoFS", - }, &cli.UintFlag{ Name: "index-file-size", Usage: "Size of index file", @@ -82,25 +86,10 @@ func NewCommands() []*cli.Command { Usage: "Number of workers to fetch and upload blocks concurrently", Value: 20, }, - &cli.UintFlag{ - Name: "searchers", - Usage: "Number of concurrent searches for blocks", - Value: 100, - }, - &cli.UintFlag{ - Name: "retries", - Usage: "Maximum number of Neo/NeoFS node request retries", - Value: neofs.MaxRetries, - Action: func(context *cli.Context, u uint) error { - if u < 1 { - return cli.Exit("retries should be greater than 0", 1) - } - return nil - }, - }, options.Debug, }, options.RPC...) uploadBinFlags = append(uploadBinFlags, options.Wallet...) + uploadBinFlags = append(uploadBinFlags, neoFSFlags...) return []*cli.Command{ { Name: "util", diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 0e710ac8a..757609832 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -19,28 +19,14 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/urfave/cli/v2" ) -// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. -type poolWrapper struct { - *pool.Pool -} - -// Close closes the pool and returns nil. -func (p poolWrapper) Close() error { - p.Pool.Close() - return nil -} - func uploadBin(ctx *cli.Context) error { if err := cmdargs.EnsureNone(ctx); err != nil { return err } - rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint") - containerIDStr := ctx.String("container") attr := ctx.String("block-attribute") numWorkers := ctx.Uint("workers") maxParallelSearches := ctx.Uint("searchers") @@ -52,12 +38,6 @@ func uploadBin(ctx *cli.Context) error { if err != nil { return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1) } - - var containerID cid.ID - if err = containerID.DecodeString(containerIDStr); err != nil { - return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1) - } - gctx, cancel := options.GetTimeoutContext(ctx) defer cancel() rpc, err := options.GetRPCClient(gctx, ctx) @@ -65,39 +45,20 @@ func uploadBin(ctx *cli.Context) error { return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1) } - signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey) - - params := pool.DefaultOptions() - params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) - params.SetNodeDialTimeout(neofs.DefaultDialTimeout) - params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) - p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params) + signer, pWrapper, err := options.GetNeoFSClientPool(ctx, acc) if err != nil { - return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1) + return cli.Exit(err, 1) } - pWrapper := poolWrapper{p} - if err = pWrapper.Dial(context.Background()); err != nil { - return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1) - } - defer p.Close() - - var containerObj container.Container - err = retry(func() error { - containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{}) - return err - }, maxRetries, debug) - if err != nil { - return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1) - } - containerMagic := containerObj.Attribute("Magic") + defer pWrapper.Close() v, err := rpc.GetVersion() if err != nil { return cli.Exit(fmt.Sprintf("failed to get version from RPC: %v", err), 1) } magic := strconv.Itoa(int(v.Protocol.Network)) - if containerMagic != magic { - return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1) + containerID, err := getContainer(ctx, pWrapper, magic, maxRetries, debug) + if err != nil { + return cli.Exit(err, 1) } currentBlockHeight, err := rpc.GetBlockCount() @@ -138,7 +99,7 @@ func retry(action func() error, maxRetries uint, debug bool) error { } // uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool. -func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error { +func uploadBlocksAndIndexFiles(ctx *cli.Context, p neofs.PoolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error { if currentIndexFileID*indexFileSize >= currentBlockHeight { fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight) return nil @@ -265,7 +226,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C } // searchIndexFile returns the ID and buffer for the next index file to be uploaded. -func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { +func searchIndexFile(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { var ( // buf is used to store OIDs of the uploaded blocks. buf = make([]byte, indexFileSize*oid.Size) @@ -357,7 +318,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKe // searchObjects searches in parallel for objects with attribute GE startIndex and LT // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. -func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { +func searchObjects(ctx context.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) go func() { var wg sync.WaitGroup @@ -419,7 +380,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privK } // uploadObj uploads object to the container using provided settings. -func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) { +func uploadObj(ctx context.Context, p neofs.PoolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) { var ( hdr object.Object prmObjectPutInit client.PrmObjectPutInit @@ -461,3 +422,28 @@ func getBlockIndex(header object.Object, attribute string) (int, error) { } return -1, fmt.Errorf("attribute %s not found", attribute) } + +// getContainer gets container by ID and checks its magic. +func getContainer(ctx *cli.Context, p neofs.PoolWrapper, expectedMagic string, maxRetries uint, debug bool) (cid.ID, error) { + var ( + containerObj container.Container + err error + containerIDStr = ctx.String("container") + ) + var containerID cid.ID + if err = containerID.DecodeString(containerIDStr); err != nil { + return containerID, fmt.Errorf("failed to decode container ID: %w", err) + } + err = retry(func() error { + containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{}) + return err + }, maxRetries, debug) + if err != nil { + return containerID, fmt.Errorf("failed to get container: %w", err) + } + containerMagic := containerObj.Attribute("Magic") + if containerMagic != expectedMagic { + return containerID, fmt.Errorf("container magic mismatch: expected %s, got %s", expectedMagic, containerMagic) + } + return containerID, nil +} diff --git a/cli/util/util_test.go b/cli/util/util_test.go index 6e9b0f4bb..8771ec2a4 100644 --- a/cli/util/util_test.go +++ b/cli/util/util_test.go @@ -204,8 +204,6 @@ func TestUploadBin(t *testing.T) { e.In.WriteString("one\r") e.RunWithErrorCheckExit(t, "failed to load account", append(args, "--cid", "test", "--wallet", "./not-exist.json", "--rpc-endpoint", "https://test")...) e.In.WriteString("one\r") - e.RunWithErrorCheckExit(t, "failed to decode container ID", append(args, "--cid", "test", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...) - e.In.WriteString("one\r") e.RunWithErrorCheckExit(t, "failed to create RPC client", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...) e.In.WriteString("one\r") e.RunWithErrorCheckExit(t, "failed to dial NeoFS pool", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "http://"+e.RPC.Addresses()[0])...) diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 5b7bb7cd2..4eed1c6ba 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -51,17 +51,6 @@ type Ledger interface { HeaderHeight() uint32 } -// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. -type poolWrapper struct { - *pool.Pool -} - -// Close closes the pool and returns nil. -func (p poolWrapper) Close() error { - p.Pool.Close() - return nil -} - type indexedOID struct { Index int OID oid.ID @@ -81,7 +70,7 @@ type Service struct { headerSizeMap map[int]int chain Ledger - pool poolWrapper + pool neofs.PoolWrapper enqueue func(obj bqueue.Indexable) error account *wallet.Account @@ -166,7 +155,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun } return &Service{ chain: chain, - pool: poolWrapper{Pool: p}, + pool: neofs.PoolWrapper{Pool: p}, log: logger, cfg: cfg, operationMode: opt, diff --git a/pkg/services/helpers/neofs/blockstorage.go b/pkg/services/helpers/neofs/blockstorage.go index f88a6daa9..a0220e870 100644 --- a/pkg/services/helpers/neofs/blockstorage.go +++ b/pkg/services/helpers/neofs/blockstorage.go @@ -2,6 +2,8 @@ package neofs import ( "time" + + "github.com/nspcc-dev/neofs-sdk-go/pool" ) // Constants related to NeoFS block storage. @@ -50,3 +52,14 @@ const ( // MaxBackoff is the maximum backoff duration. MaxBackoff = 20 * time.Second ) + +// PoolWrapper wraps a NeoFS pool to adapt its Close method to return an error. +type PoolWrapper struct { + *pool.Pool +} + +// Close closes the pool and returns nil. +func (p PoolWrapper) Close() error { + p.Pool.Close() + return nil +} From 5f80a142b0767e90930e5332e6c5ab4e1f153e02 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Tue, 4 Mar 2025 16:48:17 +0800 Subject: [PATCH 2/2] cli: add upload-state command Close #3782 Signed-off-by: Ekaterina Pavlova --- cli/server/dump_bin.go | 2 +- cli/server/server.go | 19 +- cli/server/server_test.go | 4 +- cli/util/convert.go | 19 ++ cli/util/upload_state.go | 206 +++++++++++++++++++++ docs/neofs-blockstorage.md | 28 ++- pkg/core/blockchain.go | 7 +- pkg/services/helpers/neofs/blockstorage.go | 2 + 8 files changed, 269 insertions(+), 18 deletions(-) create mode 100644 cli/util/upload_state.go diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go index 00aeb932e..dc3e9432a 100644 --- a/cli/server/dump_bin.go +++ b/cli/server/dump_bin.go @@ -30,7 +30,7 @@ func dumpBin(ctx *cli.Context) error { count := uint32(ctx.Uint("count")) start := uint32(ctx.Uint("start")) - chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log) if err != nil { return err } diff --git a/cli/server/server.go b/cli/server/server.go index 26a9cd686..82e8fdf6e 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -140,10 +140,11 @@ func newGraceContext() context.Context { return ctx } -func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { - chain, _, err := initBlockChain(cfg, log) +// InitBCWithMetrics initializes the blockchain with metrics with the given configuration. +func InitBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, *metrics.Service, *metrics.Service, error) { + chain, store, err := initBlockChain(cfg, log) if err != nil { - return nil, nil, nil, cli.Exit(err, 1) + return nil, nil, nil, nil, cli.Exit(err, 1) } prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log) pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log) @@ -151,14 +152,14 @@ func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *m go chain.Run() err = prometheus.Start() if err != nil { - return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1) + return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1) } err = pprof.Start() if err != nil { - return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1) + return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1) } - return chain, prometheus, pprof, nil + return chain, store, prometheus, pprof, nil } func dumpDB(ctx *cli.Context) error { @@ -189,7 +190,7 @@ func dumpDB(ctx *cli.Context) error { defer outStream.Close() writer := io.NewBinWriterFromIO(outStream) - chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log) if err != nil { return err } @@ -249,7 +250,7 @@ func restoreDB(ctx *cli.Context) error { cfg.ApplicationConfiguration.SaveStorageBatch = true } - chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log) if err != nil { return err } @@ -470,7 +471,7 @@ func startServer(ctx *cli.Context) error { return cli.Exit(err, 1) } - chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log) if err != nil { return cli.Exit(err, 1) } diff --git a/cli/server/server_test.go b/cli/server/server_test.go index 8eb1de819..b988d46f7 100644 --- a/cli/server/server_test.go +++ b/cli/server/server_test.go @@ -185,11 +185,11 @@ func TestInitBCWithMetrics(t *testing.T) { }) t.Run("bad store", func(t *testing.T) { - _, _, _, err = initBCWithMetrics(config.Config{}, logger) + _, _, _, _, err = InitBCWithMetrics(config.Config{}, logger) require.Error(t, err) }) - chain, prometheus, pprof, err := initBCWithMetrics(cfg, logger) + chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, logger) require.NoError(t, err) t.Cleanup(func() { chain.Close() diff --git a/cli/util/convert.go b/cli/util/convert.go index 763aaf9b3..487ba5df4 100644 --- a/cli/util/convert.go +++ b/cli/util/convert.go @@ -90,6 +90,18 @@ func NewCommands() []*cli.Command { }, options.RPC...) uploadBinFlags = append(uploadBinFlags, options.Wallet...) uploadBinFlags = append(uploadBinFlags, neoFSFlags...) + + uploadStateFlags := append([]cli.Flag{ + &cli.StringFlag{ + Name: "state-attribute", + Usage: "Attribute key of the state object", + Value: neofs.DefaultStateAttribute, + Action: cmdargs.EnsureNotEmpty("state-attribute"), + }, + options.Debug, options.Config, options.ConfigFile, options.RelativePath, + }, options.Wallet...) + uploadStateFlags = append(uploadStateFlags, options.Network...) + uploadStateFlags = append(uploadStateFlags, neoFSFlags...) return []*cli.Command{ { Name: "util", @@ -174,6 +186,13 @@ func NewCommands() []*cli.Command { Action: uploadBin, Flags: uploadBinFlags, }, + { + Name: "upload-state", + Usage: "Start the node, traverse MPT and upload MPT nodes to the NeoFS container at every StateSyncInterval number of blocks", + UsageText: "neo-go util upload-state --fs-rpc-endpoint [,[...]] --container --state-attribute state --wallet [--wallet-config ] [--address
] [--searchers ] [--retries ] [--debug] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: uploadState, + Flags: uploadStateFlags, + }, }, }, } diff --git a/cli/util/upload_state.go b/cli/util/upload_state.go new file mode 100644 index 000000000..8d14c9629 --- /dev/null +++ b/cli/util/upload_state.go @@ -0,0 +1,206 @@ +package util + +import ( + "fmt" + "strconv" + "time" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/cli/server" + "github.com/nspcc-dev/neo-go/pkg/core" + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/urfave/cli/v2" + "go.uber.org/zap" +) + +func uploadState(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + attr := ctx.String("state-attribute") + maxRetries := ctx.Uint("retries") + debug := ctx.Bool("debug") + + acc, _, err := options.GetAccFromContext(ctx) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1) + } + + signer, p, err := options.GetNeoFSClientPool(ctx, acc) + if err != nil { + return cli.Exit(err, 1) + } + defer p.Close() + log, _, logCloser, err := options.HandleLoggingParams(debug, cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + + chain, store, prometheus, pprof, err := server.InitBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + if chain.GetConfig().Ledger.KeepOnlyLatestState || chain.GetConfig().Ledger.RemoveUntraceableBlocks { + return cli.Exit("only full-state node is supported: disable KeepOnlyLatestState and RemoveUntraceableBlocks", 1) + } + syncInterval := cfg.ProtocolConfiguration.StateSyncInterval + if syncInterval == 0 { + syncInterval = core.DefaultStateSyncInterval + } + + containerID, err := getContainer(ctx, p, strconv.Itoa(int(chain.GetConfig().Magic)), maxRetries, debug) + if err != nil { + return cli.Exit(err, 1) + } + + stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug) + if err != nil { + return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1) + } + stateModule := chain.GetStateModule() + currentHeight := int(stateModule.CurrentLocalHeight()) + currentStateIndex := currentHeight / syncInterval + if currentStateIndex <= stateObjCount { + log.Info("no new states to upload", + zap.Int("number of uploaded state objects", stateObjCount), + zap.Int("latest state is uploaded for block", stateObjCount*syncInterval), + zap.Int("current height", currentHeight), + zap.Int("StateSyncInterval", syncInterval)) + return nil + } + log.Info("starting uploading", + zap.Int("number of uploaded state objects", stateObjCount), + zap.Int("next state to upload for block", stateObjCount*syncInterval), + zap.Int("current height", currentHeight), + zap.Int("StateSyncInterval", syncInterval), + zap.Int("number of states to upload", currentStateIndex-stateObjCount)) + for state := stateObjCount; state < currentStateIndex; state++ { + height := uint32(state * syncInterval) + stateRoot, err := stateModule.GetStateRoot(height) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to get state root for height %d: %v", height, err), 1) + } + h, err := chain.GetHeader(chain.GetHeaderHash(height)) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to get header %d: %v", height, err), 1) + } + + var ( + hdr object.Object + prmObjectPutInit client.PrmObjectPutInit + attrs = []object.Attribute{ + *object.NewAttribute(attr, strconv.Itoa(int(height))), + *object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)), + *object.NewAttribute("StateRoot", stateRoot.Root.StringLE()), + *object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)), + *object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)), + } + ) + hdr.SetContainerID(containerID) + hdr.SetOwner(signer.UserID()) + hdr.SetAttributes(attrs...) + err = retry(func() error { + writer, err := p.ObjectPutInit(ctx.Context, hdr, signer, prmObjectPutInit) + if err != nil { + return err + } + start := time.Now() + wrt := gio.NewBinWriterFromIO(writer) + wrt.WriteB(byte(0)) + wrt.WriteU32LE(uint32(chain.GetConfig().Magic)) + wrt.WriteU32LE(height) + wrt.WriteBytes(stateRoot.Root[:]) + err = traverseMPT(stateRoot.Root, store, wrt) + if err != nil { + _ = writer.Close() + return err + } + err = writer.Close() + if err != nil { + return err + } + duration := time.Since(start) + res := writer.GetResult() + log.Info("uploaded state object", + zap.String("object ID", res.StoredObjectID().String()), + zap.Uint32("height", height), + zap.Duration("time spent", duration)) + return nil + }, maxRetries, debug) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to upload object at height %d: %v", height, err), 1) + } + } + return nil +} + +func searchStateIndex(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, + attributeKey string, syncInterval int, maxRetries uint, debug bool, +) (int, error) { + var ( + doneCh = make(chan struct{}) + errCh = make(chan error) + objCount = 0 + ) + + go func() { + defer close(doneCh) + for i := 0; ; i++ { + indexIDs := searchObjects(ctx.Context, p, containerID, privKeys, + attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh) + resOIDs := make([]oid.ID, 0, 1) + for id := range indexIDs { + resOIDs = append(resOIDs, id) + } + if len(resOIDs) == 0 { + break + } + if len(resOIDs) > 1 { + fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs) + } + objCount++ + } + }() + select { + case err := <-errCh: + return objCount, err + case <-doneCh: + return objCount, nil + } +} + +func traverseMPT(root util.Uint256, store storage.Store, writer *gio.BinWriter) error { + cache := storage.NewMemCachedStore(store) + billet := mpt.NewBillet(root, mpt.ModeAll, mpt.DummySTTempStoragePrefix, cache) + err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool { + writer.WriteVarBytes(nodeBytes) + return writer.Err != nil + }, false) + if err != nil { + return fmt.Errorf("billet traversal error: %w", err) + } + return nil +} diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md index aa84db0dc..6c048a120 100644 --- a/docs/neofs-blockstorage.md +++ b/docs/neofs-blockstorage.md @@ -73,8 +73,8 @@ parameter. Once all blocks available in the NeoFS container are processed, the service shuts down automatically. -### NeoFS Upload Command -The `upload-bin` command is designed to fetch blocks from the RPC node and upload +### NeoFS block uploading command +The `util upload-bin` command is designed to fetch blocks from the RPC node and upload them to the NeoFS container. It also creates and uploads index files. Below is an example usage of the command: @@ -101,4 +101,26 @@ files are needed (different `index-file-size` or `index-attribute`), `upload-bin will upload the entire block sequence starting from genesis since no migration is supported yet by this command. Please, add a comment to the [#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this -functionality. \ No newline at end of file +functionality. + +### NeoFS state uploading command +The `util upload-state` command is used to start a node, traverse the MPT over the +smart contract storage, and upload MPT nodes to a NeoFS container at every +`StateSyncInterval` number of blocks. Below is an example usage of the command: + +```shell +./bin/neo-go util upload-state --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --state-attribute State -m -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080 +``` + +Run `./bin/neo-go util upload-state --help` to see the full list of supported options. + +This command works as follows: +1. Searches for the state objects stored in NeoFS to find the latest uploaded object. +2. Checks if new state objects could be uploaded given the current local state height. +3. Traverses the MPT nodes (pre-order) starting from the stateroot at the height of the + latest uploaded state object down to its children. +4. Uploads the MPT nodes to the NeoFS container. +5. Repeats steps 3-4 with a step equal to the `StateSyncInterval` number of blocks. + +If the command is interrupted, it can be resumed. It starts the uploading process +from the last uploaded state object. \ No newline at end of file diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index e722d420c..b5271b4b4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -61,7 +61,8 @@ const ( defaultTimePerBlock = 15 * time.Second // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. HeaderVerificationGasLimit = 3_00000000 // 3 GAS - defaultStateSyncInterval = 40000 + // DefaultStateSyncInterval is the default interval for state sync. + DefaultStateSyncInterval = 40000 // defaultBlockTimesCache should be sufficient for tryRunGC() to get in // sync with storeBlock(). Most of the time they differ by some thousands of @@ -310,7 +311,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)") } if cfg.StateSyncInterval <= 0 { - cfg.StateSyncInterval = defaultStateSyncInterval + cfg.StateSyncInterval = DefaultStateSyncInterval log.Info("StateSyncInterval is not set or wrong, using default value", zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } @@ -320,7 +321,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off") } if cfg.StateSyncInterval <= 0 { - cfg.StateSyncInterval = defaultStateSyncInterval + cfg.StateSyncInterval = DefaultStateSyncInterval log.Info("StateSyncInterval is not set or wrong, using default value", zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } diff --git a/pkg/services/helpers/neofs/blockstorage.go b/pkg/services/helpers/neofs/blockstorage.go index a0220e870..6485d352f 100644 --- a/pkg/services/helpers/neofs/blockstorage.go +++ b/pkg/services/helpers/neofs/blockstorage.go @@ -18,6 +18,8 @@ const ( DefaultBlockAttribute = "Block" // DefaultIndexFileAttribute is the default attribute name for index file objects. DefaultIndexFileAttribute = "Index" + // DefaultStateAttribute is the default attribute name for state objects. + DefaultStateAttribute = "State" // DefaultSearchBatchSize is a number of objects to search in a batch. We need to // search with EQ filter to avoid partially-completed SEARCH responses. If EQ search