Merge pull request #3808 from nspcc-dev/upload-state

cli: add `upload-state ` command
This commit is contained in:
Anna Shaleva 2025-03-07 14:05:23 +03:00 committed by GitHub
commit 2881961421
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 395 additions and 126 deletions

View file

@ -25,8 +25,11 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -43,9 +46,13 @@ const (
DefaultAwaitableTimeout = 3 * 15 * time.Second DefaultAwaitableTimeout = 3 * 15 * time.Second
) )
// RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to const (
// check for flag presence in the context. // RPCEndpointFlag is a long flag name for an RPC endpoint. It can be used to
const RPCEndpointFlag = "rpc-endpoint" // check for flag presence in the context.
RPCEndpointFlag = "rpc-endpoint"
// NeoFSRPCEndpointFlag is a long flag name for a NeoFS RPC endpoint.
NeoFSRPCEndpointFlag = "fs-rpc-endpoint"
)
// Wallet is a set of flags used for wallet operations. // Wallet is a set of flags used for wallet operations.
var Wallet = []cli.Flag{ var Wallet = []cli.Flag{
@ -101,6 +108,22 @@ var RPC = []cli.Flag{
}, },
} }
// NeoFSRPC is a set of flags used for NeoFS RPC connections (endpoint).
var NeoFSRPC = []cli.Flag{&cli.StringSliceFlag{
Name: NeoFSRPCEndpointFlag,
Aliases: []string{"fsr"},
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
Required: true,
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
for _, endpoint := range fsRpcEndpoints {
if endpoint == "" {
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
}
}
return nil
},
}}
// Historic is a flag for commands that can perform historic invocations. // Historic is a flag for commands that can perform historic invocations.
var Historic = &cli.StringFlag{ var Historic = &cli.StringFlag{
Name: "historic", Name: "historic",
@ -180,6 +203,26 @@ func GetRPCClient(gctx context.Context, ctx *cli.Context) (*rpcclient.Client, cl
return c, nil return c, nil
} }
// GetNeoFSClientPool returns a NeoFS pool and a signer for the given Context.
func GetNeoFSClientPool(ctx *cli.Context, acc *wallet.Account) (user.Signer, neofs.PoolWrapper, error) {
rpcNeoFS := ctx.StringSlice(NeoFSRPCEndpointFlag)
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
params := pool.DefaultOptions()
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
if err != nil {
return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to create NeoFS pool: %w", err)
}
pWrapper := neofs.PoolWrapper{Pool: p}
if err = pWrapper.Dial(context.Background()); err != nil {
return nil, neofs.PoolWrapper{}, fmt.Errorf("failed to dial NeoFS pool: %w", err)
}
return signer, pWrapper, nil
}
// GetInvoker returns an invoker using the given RPC client, context and signers. // GetInvoker returns an invoker using the given RPC client, context and signers.
// It parses "--historic" parameter to adjust it. // It parses "--historic" parameter to adjust it.
func GetInvoker(c *rpcclient.Client, ctx *cli.Context, signers []transaction.Signer) (*invoker.Invoker, cli.ExitCoder) { func GetInvoker(c *rpcclient.Client, ctx *cli.Context, signers []transaction.Signer) (*invoker.Invoker, cli.ExitCoder) {

View file

@ -30,7 +30,7 @@ func dumpBin(ctx *cli.Context) error {
count := uint32(ctx.Uint("count")) count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start")) start := uint32(ctx.Uint("start"))
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }

View file

@ -140,10 +140,11 @@ func newGraceContext() context.Context {
return ctx return ctx
} }
func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { // InitBCWithMetrics initializes the blockchain with metrics with the given configuration.
chain, _, err := initBlockChain(cfg, log) func InitBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, *metrics.Service, *metrics.Service, error) {
chain, store, err := initBlockChain(cfg, log)
if err != nil { if err != nil {
return nil, nil, nil, cli.Exit(err, 1) return nil, nil, nil, nil, cli.Exit(err, 1)
} }
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log) prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log) pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
@ -151,14 +152,14 @@ func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *m
go chain.Run() go chain.Run()
err = prometheus.Start() err = prometheus.Start()
if err != nil { if err != nil {
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1) return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
} }
err = pprof.Start() err = pprof.Start()
if err != nil { if err != nil {
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1) return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
} }
return chain, prometheus, pprof, nil return chain, store, prometheus, pprof, nil
} }
func dumpDB(ctx *cli.Context) error { func dumpDB(ctx *cli.Context) error {
@ -189,7 +190,7 @@ func dumpDB(ctx *cli.Context) error {
defer outStream.Close() defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream) writer := io.NewBinWriterFromIO(outStream)
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }
@ -249,7 +250,7 @@ func restoreDB(ctx *cli.Context) error {
cfg.ApplicationConfiguration.SaveStorageBatch = true cfg.ApplicationConfiguration.SaveStorageBatch = true
} }
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }
@ -470,7 +471,7 @@ func startServer(ctx *cli.Context) error {
return cli.Exit(err, 1) return cli.Exit(err, 1)
} }
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return cli.Exit(err, 1) return cli.Exit(err, 1)
} }

View file

@ -185,11 +185,11 @@ func TestInitBCWithMetrics(t *testing.T) {
}) })
t.Run("bad store", func(t *testing.T) { t.Run("bad store", func(t *testing.T) {
_, _, _, err = initBCWithMetrics(config.Config{}, logger) _, _, _, _, err = InitBCWithMetrics(config.Config{}, logger)
require.Error(t, err) require.Error(t, err)
}) })
chain, prometheus, pprof, err := initBCWithMetrics(cfg, logger) chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, logger)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
chain.Close() chain.Close()

View file

@ -16,6 +16,35 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
var neoFSFlags = append([]cli.Flag{
&cli.StringFlag{
Name: "container",
Aliases: []string{"cid"},
Usage: "NeoFS container ID to upload objects to",
Required: true,
Action: cmdargs.EnsureNotEmpty("container"),
},
&flags.AddressFlag{
Name: "address",
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
},
&cli.UintFlag{
Name: "retries",
Usage: "Maximum number of NeoFS node request retries",
Value: neofs.MaxRetries,
Action: func(context *cli.Context, u uint) error {
if u < 1 {
return cli.Exit("retries should be greater than 0", 1)
}
return nil
},
},
&cli.UintFlag{
Name: "searchers",
Usage: "Number of concurrent searches for objects",
Value: 100,
}}, options.NeoFSRPC...)
// NewCommands returns util commands for neo-go CLI. // NewCommands returns util commands for neo-go CLI.
func NewCommands() []*cli.Command { func NewCommands() []*cli.Command {
// By default, RPC flag is required. sendtx and txdump may be called without provided rpc-endpoint. // By default, RPC flag is required. sendtx and txdump may be called without provided rpc-endpoint.
@ -35,27 +64,6 @@ func NewCommands() []*cli.Command {
}, options.RPC...) }, options.RPC...)
txCancelFlags = append(txCancelFlags, options.Wallet...) txCancelFlags = append(txCancelFlags, options.Wallet...)
uploadBinFlags := append([]cli.Flag{ uploadBinFlags := append([]cli.Flag{
&cli.StringSliceFlag{
Name: "fs-rpc-endpoint",
Aliases: []string{"fsr"},
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
Required: true,
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
for _, endpoint := range fsRpcEndpoints {
if endpoint == "" {
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
}
}
return nil
},
},
&cli.StringFlag{
Name: "container",
Aliases: []string{"cid"},
Usage: "NeoFS container ID to upload blocks to",
Required: true,
Action: cmdargs.EnsureNotEmpty("container"),
},
&cli.StringFlag{ &cli.StringFlag{
Name: "block-attribute", Name: "block-attribute",
Usage: "Attribute key of the block object", Usage: "Attribute key of the block object",
@ -68,10 +76,6 @@ func NewCommands() []*cli.Command {
Value: neofs.DefaultIndexFileAttribute, Value: neofs.DefaultIndexFileAttribute,
Action: cmdargs.EnsureNotEmpty("index-attribute"), Action: cmdargs.EnsureNotEmpty("index-attribute"),
}, },
&flags.AddressFlag{
Name: "address",
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
},
&cli.UintFlag{ &cli.UintFlag{
Name: "index-file-size", Name: "index-file-size",
Usage: "Size of index file", Usage: "Size of index file",
@ -82,25 +86,22 @@ func NewCommands() []*cli.Command {
Usage: "Number of workers to fetch and upload blocks concurrently", Usage: "Number of workers to fetch and upload blocks concurrently",
Value: 20, Value: 20,
}, },
&cli.UintFlag{
Name: "searchers",
Usage: "Number of concurrent searches for blocks",
Value: 100,
},
&cli.UintFlag{
Name: "retries",
Usage: "Maximum number of Neo/NeoFS node request retries",
Value: neofs.MaxRetries,
Action: func(context *cli.Context, u uint) error {
if u < 1 {
return cli.Exit("retries should be greater than 0", 1)
}
return nil
},
},
options.Debug, options.Debug,
}, options.RPC...) }, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...) uploadBinFlags = append(uploadBinFlags, options.Wallet...)
uploadBinFlags = append(uploadBinFlags, neoFSFlags...)
uploadStateFlags := append([]cli.Flag{
&cli.StringFlag{
Name: "state-attribute",
Usage: "Attribute key of the state object",
Value: neofs.DefaultStateAttribute,
Action: cmdargs.EnsureNotEmpty("state-attribute"),
},
options.Debug, options.Config, options.ConfigFile, options.RelativePath,
}, options.Wallet...)
uploadStateFlags = append(uploadStateFlags, options.Network...)
uploadStateFlags = append(uploadStateFlags, neoFSFlags...)
return []*cli.Command{ return []*cli.Command{
{ {
Name: "util", Name: "util",
@ -185,6 +186,13 @@ func NewCommands() []*cli.Command {
Action: uploadBin, Action: uploadBin,
Flags: uploadBinFlags, Flags: uploadBinFlags,
}, },
{
Name: "upload-state",
Usage: "Start the node, traverse MPT and upload MPT nodes to the NeoFS container at every StateSyncInterval number of blocks",
UsageText: "neo-go util upload-state --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --state-attribute state --wallet <wallet> [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: uploadState,
Flags: uploadStateFlags,
},
}, },
}, },
} }

View file

@ -19,28 +19,14 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
}
// Close closes the pool and returns nil.
func (p poolWrapper) Close() error {
p.Pool.Close()
return nil
}
func uploadBin(ctx *cli.Context) error { func uploadBin(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil { if err := cmdargs.EnsureNone(ctx); err != nil {
return err return err
} }
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
containerIDStr := ctx.String("container")
attr := ctx.String("block-attribute") attr := ctx.String("block-attribute")
numWorkers := ctx.Uint("workers") numWorkers := ctx.Uint("workers")
maxParallelSearches := ctx.Uint("searchers") maxParallelSearches := ctx.Uint("searchers")
@ -52,12 +38,6 @@ func uploadBin(ctx *cli.Context) error {
if err != nil { if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
} }
var containerID cid.ID
if err = containerID.DecodeString(containerIDStr); err != nil {
return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1)
}
gctx, cancel := options.GetTimeoutContext(ctx) gctx, cancel := options.GetTimeoutContext(ctx)
defer cancel() defer cancel()
rpc, err := options.GetRPCClient(gctx, ctx) rpc, err := options.GetRPCClient(gctx, ctx)
@ -65,39 +45,20 @@ func uploadBin(ctx *cli.Context) error {
return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1)
} }
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey) signer, pWrapper, err := options.GetNeoFSClientPool(ctx, acc)
params := pool.DefaultOptions()
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
if err != nil { if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1) return cli.Exit(err, 1)
} }
pWrapper := poolWrapper{p} defer pWrapper.Close()
if err = pWrapper.Dial(context.Background()); err != nil {
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
}
defer p.Close()
var containerObj container.Container
err = retry(func() error {
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
return err
}, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
}
containerMagic := containerObj.Attribute("Magic")
v, err := rpc.GetVersion() v, err := rpc.GetVersion()
if err != nil { if err != nil {
return cli.Exit(fmt.Sprintf("failed to get version from RPC: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to get version from RPC: %v", err), 1)
} }
magic := strconv.Itoa(int(v.Protocol.Network)) magic := strconv.Itoa(int(v.Protocol.Network))
if containerMagic != magic { containerID, err := getContainer(ctx, pWrapper, magic, maxRetries, debug)
return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1) if err != nil {
return cli.Exit(err, 1)
} }
currentBlockHeight, err := rpc.GetBlockCount() currentBlockHeight, err := rpc.GetBlockCount()
@ -138,7 +99,7 @@ func retry(action func() error, maxRetries uint, debug bool) error {
} }
// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool. // uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool.
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error { func uploadBlocksAndIndexFiles(ctx *cli.Context, p neofs.PoolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error {
if currentIndexFileID*indexFileSize >= currentBlockHeight { if currentIndexFileID*indexFileSize >= currentBlockHeight {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight) fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight)
return nil return nil
@ -265,7 +226,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
} }
// searchIndexFile returns the ID and buffer for the next index file to be uploaded. // searchIndexFile returns the ID and buffer for the next index file to be uploaded.
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { func searchIndexFile(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
var ( var (
// buf is used to store OIDs of the uploaded blocks. // buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*oid.Size) buf = make([]byte, indexFileSize*oid.Size)
@ -357,7 +318,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKe
// searchObjects searches in parallel for objects with attribute GE startIndex and LT // searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once // endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way. // OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { func searchObjects(ctx context.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
go func() { go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -419,7 +380,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privK
} }
// uploadObj uploads object to the container using provided settings. // uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) { func uploadObj(ctx context.Context, p neofs.PoolWrapper, signer user.Signer, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) {
var ( var (
hdr object.Object hdr object.Object
prmObjectPutInit client.PrmObjectPutInit prmObjectPutInit client.PrmObjectPutInit
@ -461,3 +422,28 @@ func getBlockIndex(header object.Object, attribute string) (int, error) {
} }
return -1, fmt.Errorf("attribute %s not found", attribute) return -1, fmt.Errorf("attribute %s not found", attribute)
} }
// getContainer gets container by ID and checks its magic.
func getContainer(ctx *cli.Context, p neofs.PoolWrapper, expectedMagic string, maxRetries uint, debug bool) (cid.ID, error) {
var (
containerObj container.Container
err error
containerIDStr = ctx.String("container")
)
var containerID cid.ID
if err = containerID.DecodeString(containerIDStr); err != nil {
return containerID, fmt.Errorf("failed to decode container ID: %w", err)
}
err = retry(func() error {
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
return err
}, maxRetries, debug)
if err != nil {
return containerID, fmt.Errorf("failed to get container: %w", err)
}
containerMagic := containerObj.Attribute("Magic")
if containerMagic != expectedMagic {
return containerID, fmt.Errorf("container magic mismatch: expected %s, got %s", expectedMagic, containerMagic)
}
return containerID, nil
}

206
cli/util/upload_state.go Normal file
View file

@ -0,0 +1,206 @@
package util
import (
"fmt"
"strconv"
"time"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/cli/server"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
)
func uploadState(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)
}
attr := ctx.String("state-attribute")
maxRetries := ctx.Uint("retries")
debug := ctx.Bool("debug")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
}
signer, p, err := options.GetNeoFSClientPool(ctx, acc)
if err != nil {
return cli.Exit(err, 1)
}
defer p.Close()
log, _, logCloser, err := options.HandleLoggingParams(debug, cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)
}
if logCloser != nil {
defer func() { _ = logCloser() }()
}
chain, store, prometheus, pprof, err := server.InitBCWithMetrics(cfg, log)
if err != nil {
return err
}
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()
if chain.GetConfig().Ledger.KeepOnlyLatestState || chain.GetConfig().Ledger.RemoveUntraceableBlocks {
return cli.Exit("only full-state node is supported: disable KeepOnlyLatestState and RemoveUntraceableBlocks", 1)
}
syncInterval := cfg.ProtocolConfiguration.StateSyncInterval
if syncInterval == 0 {
syncInterval = core.DefaultStateSyncInterval
}
containerID, err := getContainer(ctx, p, strconv.Itoa(int(chain.GetConfig().Magic)), maxRetries, debug)
if err != nil {
return cli.Exit(err, 1)
}
stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1)
}
stateModule := chain.GetStateModule()
currentHeight := int(stateModule.CurrentLocalHeight())
currentStateIndex := currentHeight / syncInterval
if currentStateIndex <= stateObjCount {
log.Info("no new states to upload",
zap.Int("number of uploaded state objects", stateObjCount),
zap.Int("latest state is uploaded for block", stateObjCount*syncInterval),
zap.Int("current height", currentHeight),
zap.Int("StateSyncInterval", syncInterval))
return nil
}
log.Info("starting uploading",
zap.Int("number of uploaded state objects", stateObjCount),
zap.Int("next state to upload for block", stateObjCount*syncInterval),
zap.Int("current height", currentHeight),
zap.Int("StateSyncInterval", syncInterval),
zap.Int("number of states to upload", currentStateIndex-stateObjCount))
for state := stateObjCount; state < currentStateIndex; state++ {
height := uint32(state * syncInterval)
stateRoot, err := stateModule.GetStateRoot(height)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get state root for height %d: %v", height, err), 1)
}
h, err := chain.GetHeader(chain.GetHeaderHash(height))
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get header %d: %v", height, err), 1)
}
var (
hdr object.Object
prmObjectPutInit client.PrmObjectPutInit
attrs = []object.Attribute{
*object.NewAttribute(attr, strconv.Itoa(int(height))),
*object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
*object.NewAttribute("StateRoot", stateRoot.Root.StringLE()),
*object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)),
*object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)),
}
)
hdr.SetContainerID(containerID)
hdr.SetOwner(signer.UserID())
hdr.SetAttributes(attrs...)
err = retry(func() error {
writer, err := p.ObjectPutInit(ctx.Context, hdr, signer, prmObjectPutInit)
if err != nil {
return err
}
start := time.Now()
wrt := gio.NewBinWriterFromIO(writer)
wrt.WriteB(byte(0))
wrt.WriteU32LE(uint32(chain.GetConfig().Magic))
wrt.WriteU32LE(height)
wrt.WriteBytes(stateRoot.Root[:])
err = traverseMPT(stateRoot.Root, store, wrt)
if err != nil {
_ = writer.Close()
return err
}
err = writer.Close()
if err != nil {
return err
}
duration := time.Since(start)
res := writer.GetResult()
log.Info("uploaded state object",
zap.String("object ID", res.StoredObjectID().String()),
zap.Uint32("height", height),
zap.Duration("time spent", duration))
return nil
}, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to upload object at height %d: %v", height, err), 1)
}
}
return nil
}
func searchStateIndex(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey,
attributeKey string, syncInterval int, maxRetries uint, debug bool,
) (int, error) {
var (
doneCh = make(chan struct{})
errCh = make(chan error)
objCount = 0
)
go func() {
defer close(doneCh)
for i := 0; ; i++ {
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys,
attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh)
resOIDs := make([]oid.ID, 0, 1)
for id := range indexIDs {
resOIDs = append(resOIDs, id)
}
if len(resOIDs) == 0 {
break
}
if len(resOIDs) > 1 {
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs)
}
objCount++
}
}()
select {
case err := <-errCh:
return objCount, err
case <-doneCh:
return objCount, nil
}
}
func traverseMPT(root util.Uint256, store storage.Store, writer *gio.BinWriter) error {
cache := storage.NewMemCachedStore(store)
billet := mpt.NewBillet(root, mpt.ModeAll, mpt.DummySTTempStoragePrefix, cache)
err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
writer.WriteVarBytes(nodeBytes)
return writer.Err != nil
}, false)
if err != nil {
return fmt.Errorf("billet traversal error: %w", err)
}
return nil
}

View file

@ -204,8 +204,6 @@ func TestUploadBin(t *testing.T) {
e.In.WriteString("one\r") e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to load account", append(args, "--cid", "test", "--wallet", "./not-exist.json", "--rpc-endpoint", "https://test")...) e.RunWithErrorCheckExit(t, "failed to load account", append(args, "--cid", "test", "--wallet", "./not-exist.json", "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r") e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to decode container ID", append(args, "--cid", "test", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to create RPC client", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...) e.RunWithErrorCheckExit(t, "failed to create RPC client", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r") e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to dial NeoFS pool", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "http://"+e.RPC.Addresses()[0])...) e.RunWithErrorCheckExit(t, "failed to dial NeoFS pool", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "http://"+e.RPC.Addresses()[0])...)

View file

@ -73,8 +73,8 @@ parameter.
Once all blocks available in the NeoFS container are processed, the service Once all blocks available in the NeoFS container are processed, the service
shuts down automatically. shuts down automatically.
### NeoFS Upload Command ### NeoFS block uploading command
The `upload-bin` command is designed to fetch blocks from the RPC node and upload The `util upload-bin` command is designed to fetch blocks from the RPC node and upload
them to the NeoFS container. It also creates and uploads index files. Below is an them to the NeoFS container. It also creates and uploads index files. Below is an
example usage of the command: example usage of the command:
@ -102,3 +102,25 @@ will upload the entire block sequence starting from genesis since no migration i
supported yet by this command. Please, add a comment to the supported yet by this command. Please, add a comment to the
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this [#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
functionality. functionality.
### NeoFS state uploading command
The `util upload-state` command is used to start a node, traverse the MPT over the
smart contract storage, and upload MPT nodes to a NeoFS container at every
`StateSyncInterval` number of blocks. Below is an example usage of the command:
```shell
./bin/neo-go util upload-state --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --state-attribute State -m -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080
```
Run `./bin/neo-go util upload-state --help` to see the full list of supported options.
This command works as follows:
1. Searches for the state objects stored in NeoFS to find the latest uploaded object.
2. Checks if new state objects could be uploaded given the current local state height.
3. Traverses the MPT nodes (pre-order) starting from the stateroot at the height of the
latest uploaded state object down to its children.
4. Uploads the MPT nodes to the NeoFS container.
5. Repeats steps 3-4 with a step equal to the `StateSyncInterval` number of blocks.
If the command is interrupted, it can be resumed. It starts the uploading process
from the last uploaded state object.

View file

@ -61,7 +61,8 @@ const (
defaultTimePerBlock = 15 * time.Second defaultTimePerBlock = 15 * time.Second
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification. // HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000 // DefaultStateSyncInterval is the default interval for state sync.
DefaultStateSyncInterval = 40000
// defaultBlockTimesCache should be sufficient for tryRunGC() to get in // defaultBlockTimesCache should be sufficient for tryRunGC() to get in
// sync with storeBlock(). Most of the time they differ by some thousands of // sync with storeBlock(). Most of the time they differ by some thousands of
@ -310,7 +311,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)") return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
} }
if cfg.StateSyncInterval <= 0 { if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval cfg.StateSyncInterval = DefaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value", log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval)) zap.Int("StateSyncInterval", cfg.StateSyncInterval))
} }
@ -320,7 +321,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off") return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
} }
if cfg.StateSyncInterval <= 0 { if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval cfg.StateSyncInterval = DefaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value", log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval)) zap.Int("StateSyncInterval", cfg.StateSyncInterval))
} }

View file

@ -51,17 +51,6 @@ type Ledger interface {
HeaderHeight() uint32 HeaderHeight() uint32
} }
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
}
// Close closes the pool and returns nil.
func (p poolWrapper) Close() error {
p.Pool.Close()
return nil
}
type indexedOID struct { type indexedOID struct {
Index int Index int
OID oid.ID OID oid.ID
@ -81,7 +70,7 @@ type Service struct {
headerSizeMap map[int]int headerSizeMap map[int]int
chain Ledger chain Ledger
pool poolWrapper pool neofs.PoolWrapper
enqueue func(obj bqueue.Indexable) error enqueue func(obj bqueue.Indexable) error
account *wallet.Account account *wallet.Account
@ -166,7 +155,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
} }
return &Service{ return &Service{
chain: chain, chain: chain,
pool: poolWrapper{Pool: p}, pool: neofs.PoolWrapper{Pool: p},
log: logger, log: logger,
cfg: cfg, cfg: cfg,
operationMode: opt, operationMode: opt,

View file

@ -2,6 +2,8 @@ package neofs
import ( import (
"time" "time"
"github.com/nspcc-dev/neofs-sdk-go/pool"
) )
// Constants related to NeoFS block storage. // Constants related to NeoFS block storage.
@ -16,6 +18,8 @@ const (
DefaultBlockAttribute = "Block" DefaultBlockAttribute = "Block"
// DefaultIndexFileAttribute is the default attribute name for index file objects. // DefaultIndexFileAttribute is the default attribute name for index file objects.
DefaultIndexFileAttribute = "Index" DefaultIndexFileAttribute = "Index"
// DefaultStateAttribute is the default attribute name for state objects.
DefaultStateAttribute = "State"
// DefaultSearchBatchSize is a number of objects to search in a batch. We need to // DefaultSearchBatchSize is a number of objects to search in a batch. We need to
// search with EQ filter to avoid partially-completed SEARCH responses. If EQ search // search with EQ filter to avoid partially-completed SEARCH responses. If EQ search
@ -50,3 +54,14 @@ const (
// MaxBackoff is the maximum backoff duration. // MaxBackoff is the maximum backoff duration.
MaxBackoff = 20 * time.Second MaxBackoff = 20 * time.Second
) )
// PoolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type PoolWrapper struct {
*pool.Pool
}
// Close closes the pool and returns nil.
func (p PoolWrapper) Close() error {
p.Pool.Close()
return nil
}