mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2025-04-15 15:04:51 +00:00
cli: add upload-state command
Close #3782 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
parent
9c0274850a
commit
5f80a142b0
8 changed files with 269 additions and 18 deletions
|
@ -30,7 +30,7 @@ func dumpBin(ctx *cli.Context) error {
|
|||
count := uint32(ctx.Uint("count"))
|
||||
start := uint32(ctx.Uint("start"))
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -140,10 +140,11 @@ func newGraceContext() context.Context {
|
|||
return ctx
|
||||
}
|
||||
|
||||
func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
|
||||
chain, _, err := initBlockChain(cfg, log)
|
||||
// InitBCWithMetrics initializes the blockchain with metrics with the given configuration.
|
||||
func InitBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, *metrics.Service, *metrics.Service, error) {
|
||||
chain, store, err := initBlockChain(cfg, log)
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(err, 1)
|
||||
return nil, nil, nil, nil, cli.Exit(err, 1)
|
||||
}
|
||||
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
|
||||
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
|
||||
|
@ -151,14 +152,14 @@ func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *m
|
|||
go chain.Run()
|
||||
err = prometheus.Start()
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
|
||||
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
|
||||
}
|
||||
err = pprof.Start()
|
||||
if err != nil {
|
||||
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
|
||||
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
|
||||
}
|
||||
|
||||
return chain, prometheus, pprof, nil
|
||||
return chain, store, prometheus, pprof, nil
|
||||
}
|
||||
|
||||
func dumpDB(ctx *cli.Context) error {
|
||||
|
@ -189,7 +190,7 @@ func dumpDB(ctx *cli.Context) error {
|
|||
defer outStream.Close()
|
||||
writer := io.NewBinWriterFromIO(outStream)
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -249,7 +250,7 @@ func restoreDB(ctx *cli.Context) error {
|
|||
cfg.ApplicationConfiguration.SaveStorageBatch = true
|
||||
}
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -470,7 +471,7 @@ func startServer(ctx *cli.Context) error {
|
|||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
|
|
@ -185,11 +185,11 @@ func TestInitBCWithMetrics(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("bad store", func(t *testing.T) {
|
||||
_, _, _, err = initBCWithMetrics(config.Config{}, logger)
|
||||
_, _, _, _, err = InitBCWithMetrics(config.Config{}, logger)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
chain, prometheus, pprof, err := initBCWithMetrics(cfg, logger)
|
||||
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, logger)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
chain.Close()
|
||||
|
|
|
@ -90,6 +90,18 @@ func NewCommands() []*cli.Command {
|
|||
}, options.RPC...)
|
||||
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
|
||||
uploadBinFlags = append(uploadBinFlags, neoFSFlags...)
|
||||
|
||||
uploadStateFlags := append([]cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "state-attribute",
|
||||
Usage: "Attribute key of the state object",
|
||||
Value: neofs.DefaultStateAttribute,
|
||||
Action: cmdargs.EnsureNotEmpty("state-attribute"),
|
||||
},
|
||||
options.Debug, options.Config, options.ConfigFile, options.RelativePath,
|
||||
}, options.Wallet...)
|
||||
uploadStateFlags = append(uploadStateFlags, options.Network...)
|
||||
uploadStateFlags = append(uploadStateFlags, neoFSFlags...)
|
||||
return []*cli.Command{
|
||||
{
|
||||
Name: "util",
|
||||
|
@ -174,6 +186,13 @@ func NewCommands() []*cli.Command {
|
|||
Action: uploadBin,
|
||||
Flags: uploadBinFlags,
|
||||
},
|
||||
{
|
||||
Name: "upload-state",
|
||||
Usage: "Start the node, traverse MPT and upload MPT nodes to the NeoFS container at every StateSyncInterval number of blocks",
|
||||
UsageText: "neo-go util upload-state --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --state-attribute state --wallet <wallet> [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--config-path path] [-p/-m/-t] [--config-file file]",
|
||||
Action: uploadState,
|
||||
Flags: uploadStateFlags,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
206
cli/util/upload_state.go
Normal file
206
cli/util/upload_state.go
Normal file
|
@ -0,0 +1,206 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
||||
"github.com/nspcc-dev/neo-go/cli/options"
|
||||
"github.com/nspcc-dev/neo-go/cli/server"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
gio "github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func uploadState(ctx *cli.Context) error {
|
||||
if err := cmdargs.EnsureNone(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
cfg, err := options.GetConfigFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
attr := ctx.String("state-attribute")
|
||||
maxRetries := ctx.Uint("retries")
|
||||
debug := ctx.Bool("debug")
|
||||
|
||||
acc, _, err := options.GetAccFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||
}
|
||||
|
||||
signer, p, err := options.GetNeoFSClientPool(ctx, acc)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
defer p.Close()
|
||||
log, _, logCloser, err := options.HandleLoggingParams(debug, cfg.ApplicationConfiguration)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
if logCloser != nil {
|
||||
defer func() { _ = logCloser() }()
|
||||
}
|
||||
|
||||
chain, store, prometheus, pprof, err := server.InitBCWithMetrics(cfg, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
pprof.ShutDown()
|
||||
prometheus.ShutDown()
|
||||
chain.Close()
|
||||
}()
|
||||
|
||||
if chain.GetConfig().Ledger.KeepOnlyLatestState || chain.GetConfig().Ledger.RemoveUntraceableBlocks {
|
||||
return cli.Exit("only full-state node is supported: disable KeepOnlyLatestState and RemoveUntraceableBlocks", 1)
|
||||
}
|
||||
syncInterval := cfg.ProtocolConfiguration.StateSyncInterval
|
||||
if syncInterval == 0 {
|
||||
syncInterval = core.DefaultStateSyncInterval
|
||||
}
|
||||
|
||||
containerID, err := getContainer(ctx, p, strconv.Itoa(int(chain.GetConfig().Magic)), maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(err, 1)
|
||||
}
|
||||
|
||||
stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1)
|
||||
}
|
||||
stateModule := chain.GetStateModule()
|
||||
currentHeight := int(stateModule.CurrentLocalHeight())
|
||||
currentStateIndex := currentHeight / syncInterval
|
||||
if currentStateIndex <= stateObjCount {
|
||||
log.Info("no new states to upload",
|
||||
zap.Int("number of uploaded state objects", stateObjCount),
|
||||
zap.Int("latest state is uploaded for block", stateObjCount*syncInterval),
|
||||
zap.Int("current height", currentHeight),
|
||||
zap.Int("StateSyncInterval", syncInterval))
|
||||
return nil
|
||||
}
|
||||
log.Info("starting uploading",
|
||||
zap.Int("number of uploaded state objects", stateObjCount),
|
||||
zap.Int("next state to upload for block", stateObjCount*syncInterval),
|
||||
zap.Int("current height", currentHeight),
|
||||
zap.Int("StateSyncInterval", syncInterval),
|
||||
zap.Int("number of states to upload", currentStateIndex-stateObjCount))
|
||||
for state := stateObjCount; state < currentStateIndex; state++ {
|
||||
height := uint32(state * syncInterval)
|
||||
stateRoot, err := stateModule.GetStateRoot(height)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to get state root for height %d: %v", height, err), 1)
|
||||
}
|
||||
h, err := chain.GetHeader(chain.GetHeaderHash(height))
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to get header %d: %v", height, err), 1)
|
||||
}
|
||||
|
||||
var (
|
||||
hdr object.Object
|
||||
prmObjectPutInit client.PrmObjectPutInit
|
||||
attrs = []object.Attribute{
|
||||
*object.NewAttribute(attr, strconv.Itoa(int(height))),
|
||||
*object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
|
||||
*object.NewAttribute("StateRoot", stateRoot.Root.StringLE()),
|
||||
*object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)),
|
||||
*object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)),
|
||||
}
|
||||
)
|
||||
hdr.SetContainerID(containerID)
|
||||
hdr.SetOwner(signer.UserID())
|
||||
hdr.SetAttributes(attrs...)
|
||||
err = retry(func() error {
|
||||
writer, err := p.ObjectPutInit(ctx.Context, hdr, signer, prmObjectPutInit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
start := time.Now()
|
||||
wrt := gio.NewBinWriterFromIO(writer)
|
||||
wrt.WriteB(byte(0))
|
||||
wrt.WriteU32LE(uint32(chain.GetConfig().Magic))
|
||||
wrt.WriteU32LE(height)
|
||||
wrt.WriteBytes(stateRoot.Root[:])
|
||||
err = traverseMPT(stateRoot.Root, store, wrt)
|
||||
if err != nil {
|
||||
_ = writer.Close()
|
||||
return err
|
||||
}
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
duration := time.Since(start)
|
||||
res := writer.GetResult()
|
||||
log.Info("uploaded state object",
|
||||
zap.String("object ID", res.StoredObjectID().String()),
|
||||
zap.Uint32("height", height),
|
||||
zap.Duration("time spent", duration))
|
||||
return nil
|
||||
}, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to upload object at height %d: %v", height, err), 1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func searchStateIndex(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey,
|
||||
attributeKey string, syncInterval int, maxRetries uint, debug bool,
|
||||
) (int, error) {
|
||||
var (
|
||||
doneCh = make(chan struct{})
|
||||
errCh = make(chan error)
|
||||
objCount = 0
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
for i := 0; ; i++ {
|
||||
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys,
|
||||
attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh)
|
||||
resOIDs := make([]oid.ID, 0, 1)
|
||||
for id := range indexIDs {
|
||||
resOIDs = append(resOIDs, id)
|
||||
}
|
||||
if len(resOIDs) == 0 {
|
||||
break
|
||||
}
|
||||
if len(resOIDs) > 1 {
|
||||
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs)
|
||||
}
|
||||
objCount++
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return objCount, err
|
||||
case <-doneCh:
|
||||
return objCount, nil
|
||||
}
|
||||
}
|
||||
|
||||
func traverseMPT(root util.Uint256, store storage.Store, writer *gio.BinWriter) error {
|
||||
cache := storage.NewMemCachedStore(store)
|
||||
billet := mpt.NewBillet(root, mpt.ModeAll, mpt.DummySTTempStoragePrefix, cache)
|
||||
err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
|
||||
writer.WriteVarBytes(nodeBytes)
|
||||
return writer.Err != nil
|
||||
}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("billet traversal error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -73,8 +73,8 @@ parameter.
|
|||
Once all blocks available in the NeoFS container are processed, the service
|
||||
shuts down automatically.
|
||||
|
||||
### NeoFS Upload Command
|
||||
The `upload-bin` command is designed to fetch blocks from the RPC node and upload
|
||||
### NeoFS block uploading command
|
||||
The `util upload-bin` command is designed to fetch blocks from the RPC node and upload
|
||||
them to the NeoFS container. It also creates and uploads index files. Below is an
|
||||
example usage of the command:
|
||||
|
||||
|
@ -101,4 +101,26 @@ files are needed (different `index-file-size` or `index-attribute`), `upload-bin
|
|||
will upload the entire block sequence starting from genesis since no migration is
|
||||
supported yet by this command. Please, add a comment to the
|
||||
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
|
||||
functionality.
|
||||
functionality.
|
||||
|
||||
### NeoFS state uploading command
|
||||
The `util upload-state` command is used to start a node, traverse the MPT over the
|
||||
smart contract storage, and upload MPT nodes to a NeoFS container at every
|
||||
`StateSyncInterval` number of blocks. Below is an example usage of the command:
|
||||
|
||||
```shell
|
||||
./bin/neo-go util upload-state --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --state-attribute State -m -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080
|
||||
```
|
||||
|
||||
Run `./bin/neo-go util upload-state --help` to see the full list of supported options.
|
||||
|
||||
This command works as follows:
|
||||
1. Searches for the state objects stored in NeoFS to find the latest uploaded object.
|
||||
2. Checks if new state objects could be uploaded given the current local state height.
|
||||
3. Traverses the MPT nodes (pre-order) starting from the stateroot at the height of the
|
||||
latest uploaded state object down to its children.
|
||||
4. Uploads the MPT nodes to the NeoFS container.
|
||||
5. Repeats steps 3-4 with a step equal to the `StateSyncInterval` number of blocks.
|
||||
|
||||
If the command is interrupted, it can be resumed. It starts the uploading process
|
||||
from the last uploaded state object.
|
|
@ -61,7 +61,8 @@ const (
|
|||
defaultTimePerBlock = 15 * time.Second
|
||||
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
|
||||
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
|
||||
defaultStateSyncInterval = 40000
|
||||
// DefaultStateSyncInterval is the default interval for state sync.
|
||||
DefaultStateSyncInterval = 40000
|
||||
|
||||
// defaultBlockTimesCache should be sufficient for tryRunGC() to get in
|
||||
// sync with storeBlock(). Most of the time they differ by some thousands of
|
||||
|
@ -310,7 +311,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
|
|||
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
|
||||
}
|
||||
if cfg.StateSyncInterval <= 0 {
|
||||
cfg.StateSyncInterval = defaultStateSyncInterval
|
||||
cfg.StateSyncInterval = DefaultStateSyncInterval
|
||||
log.Info("StateSyncInterval is not set or wrong, using default value",
|
||||
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
|
||||
}
|
||||
|
@ -320,7 +321,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
|
|||
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
|
||||
}
|
||||
if cfg.StateSyncInterval <= 0 {
|
||||
cfg.StateSyncInterval = defaultStateSyncInterval
|
||||
cfg.StateSyncInterval = DefaultStateSyncInterval
|
||||
log.Info("StateSyncInterval is not set or wrong, using default value",
|
||||
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@ const (
|
|||
DefaultBlockAttribute = "Block"
|
||||
// DefaultIndexFileAttribute is the default attribute name for index file objects.
|
||||
DefaultIndexFileAttribute = "Index"
|
||||
// DefaultStateAttribute is the default attribute name for state objects.
|
||||
DefaultStateAttribute = "State"
|
||||
|
||||
// DefaultSearchBatchSize is a number of objects to search in a batch. We need to
|
||||
// search with EQ filter to avoid partially-completed SEARCH responses. If EQ search
|
||||
|
|
Loading…
Add table
Reference in a new issue