mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-31 09:54:16 +00:00
cli: add workers
and searchers
flags to upload-bin
command
Number of workers to fetch, upload and search blocks concurrently is now configurable. Close #3641 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
parent
4b10f23aec
commit
36e855609d
3 changed files with 20 additions and 11 deletions
|
@ -76,6 +76,16 @@ func NewCommands() []*cli.Command {
|
|||
Usage: "Size of index file",
|
||||
Value: 128000,
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "workers",
|
||||
Usage: "Number of workers to fetch and upload blocks concurrently",
|
||||
Value: 50,
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "searchers",
|
||||
Usage: "Number of concurrent searches for blocks",
|
||||
Value: 20,
|
||||
},
|
||||
}, options.RPC...)
|
||||
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
|
||||
return []*cli.Command{
|
||||
|
@ -158,7 +168,7 @@ func NewCommands() []*cli.Command {
|
|||
{
|
||||
Name: "upload-bin",
|
||||
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]",
|
||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>]",
|
||||
Action: uploadBin,
|
||||
Flags: uploadBinFlags,
|
||||
},
|
||||
|
|
|
@ -33,13 +33,8 @@ import (
|
|||
const (
|
||||
// Number of objects to search in a batch for finding max block in container.
|
||||
searchBatchSize = 10000
|
||||
// Control the number of concurrent searches.
|
||||
maxParallelSearches = 40
|
||||
// Size of object ID.
|
||||
oidSize = sha256.Size
|
||||
|
||||
// Number of workers to fetch and upload blocks concurrently.
|
||||
numWorkers = 100
|
||||
)
|
||||
|
||||
// Constants related to retry mechanism.
|
||||
|
@ -69,6 +64,8 @@ func uploadBin(ctx *cli.Context) error {
|
|||
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
||||
containerIDStr := ctx.String("container")
|
||||
attr := ctx.String("block-attribute")
|
||||
numWorkers := ctx.Int("workers")
|
||||
maxParallelSearches := ctx.Int("searchers")
|
||||
acc, _, err := options.GetAccFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||
|
@ -137,7 +134,7 @@ func uploadBin(ctx *cli.Context) error {
|
|||
}
|
||||
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
||||
|
||||
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight))
|
||||
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1)
|
||||
}
|
||||
|
@ -222,7 +219,7 @@ func uploadBin(ctx *cli.Context) error {
|
|||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
||||
}
|
||||
|
||||
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled)
|
||||
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1)
|
||||
}
|
||||
|
@ -255,7 +252,7 @@ type searchResult struct {
|
|||
|
||||
// fetchLatestMissingBlockIndex searches the container for the last full block batch,
|
||||
// starting from the currentHeight and going backwards.
|
||||
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int) (int, error) {
|
||||
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
numBatches = currentHeight/searchBatchSize + 1
|
||||
|
@ -314,7 +311,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
|||
}
|
||||
|
||||
// updateIndexFiles updates the index files in the container.
|
||||
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool) error {
|
||||
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
|
||||
attributeKey := ctx.String("index-attribute")
|
||||
indexFileSize := ctx.Uint("index-file-size")
|
||||
fmt.Fprintln(ctx.App.Writer, "Updating index files...")
|
||||
|
|
|
@ -87,7 +87,7 @@ NAME:
|
|||
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
||||
|
||||
USAGE:
|
||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]
|
||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>]
|
||||
|
||||
OPTIONS:
|
||||
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
||||
|
@ -96,6 +96,8 @@ OPTIONS:
|
|||
--index-attribute value Attribute key of the index file object
|
||||
--address value Address to use for signing the uploading and searching transactions in NeoFS
|
||||
--index-file-size value Size of index file (default: 128000)
|
||||
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
||||
--searchers value Number of concurrent searches for blocks (default: 20)
|
||||
--rpc-endpoint value, -r value RPC node address
|
||||
--timeout value, -s value Timeout for the operation (default: 10s)
|
||||
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag
|
||||
|
|
Loading…
Reference in a new issue