Merge pull request #3643 from nspcc-dev/uploader-workers

cli: add flag workers to `upload-bin` command
This commit is contained in:
Anna Shaleva 2024-10-24 12:26:51 +03:00 committed by GitHub
commit b8a65d3c37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 20 additions and 11 deletions

View file

@ -76,6 +76,16 @@ func NewCommands() []*cli.Command {
Usage: "Size of index file", Usage: "Size of index file",
Value: 128000, Value: 128000,
}, },
&cli.UintFlag{
Name: "workers",
Usage: "Number of workers to fetch and upload blocks concurrently",
Value: 50,
},
&cli.UintFlag{
Name: "searchers",
Usage: "Number of concurrent searches for blocks",
Value: 20,
},
}, options.RPC...) }, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...) uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{ return []*cli.Command{
@ -158,7 +168,7 @@ func NewCommands() []*cli.Command {
{ {
Name: "upload-bin", Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container", Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]", UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]",
Action: uploadBin, Action: uploadBin,
Flags: uploadBinFlags, Flags: uploadBinFlags,
}, },

View file

@ -33,13 +33,8 @@ import (
const ( const (
// Number of objects to search in a batch for finding max block in container. // Number of objects to search in a batch for finding max block in container.
searchBatchSize = 10000 searchBatchSize = 10000
// Control the number of concurrent searches.
maxParallelSearches = 40
// Size of object ID. // Size of object ID.
oidSize = sha256.Size oidSize = sha256.Size
// Number of workers to fetch and upload blocks concurrently.
numWorkers = 100
) )
// Constants related to retry mechanism. // Constants related to retry mechanism.
@ -69,6 +64,8 @@ func uploadBin(ctx *cli.Context) error {
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint") rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
containerIDStr := ctx.String("container") containerIDStr := ctx.String("container")
attr := ctx.String("block-attribute") attr := ctx.String("block-attribute")
numWorkers := ctx.Int("workers")
maxParallelSearches := ctx.Int("searchers")
acc, _, err := options.GetAccFromContext(ctx) acc, _, err := options.GetAccFromContext(ctx)
if err != nil { if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
@ -137,7 +134,7 @@ func uploadBin(ctx *cli.Context) error {
} }
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight)) lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1) return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1)
} }
@ -222,7 +219,7 @@ func uploadBin(ctx *cli.Context) error {
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1) fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
} }
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled) err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1) return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1)
} }
@ -255,7 +252,7 @@ type searchResult struct {
// fetchLatestMissingBlockIndex searches the container for the last full block batch, // fetchLatestMissingBlockIndex searches the container for the last full block batch,
// starting from the currentHeight and going backwards. // starting from the currentHeight and going backwards.
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int) (int, error) { func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
numBatches = currentHeight/searchBatchSize + 1 numBatches = currentHeight/searchBatchSize + 1
@ -314,7 +311,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
} }
// updateIndexFiles updates the index files in the container. // updateIndexFiles updates the index files in the container.
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool) error { func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
attributeKey := ctx.String("index-attribute") attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size") indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Updating index files...") fmt.Fprintln(ctx.App.Writer, "Updating index files...")

View file

@ -87,7 +87,7 @@ NAME:
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
USAGE: USAGE:
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]
OPTIONS: OPTIONS:
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags) --fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
@ -96,6 +96,8 @@ OPTIONS:
--index-attribute value Attribute key of the index file object --index-attribute value Attribute key of the index file object
--address value Address to use for signing the uploading and searching transactions in NeoFS --address value Address to use for signing the uploading and searching transactions in NeoFS
--index-file-size value Size of index file (default: 128000) --index-file-size value Size of index file (default: 128000)
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
--searchers value Number of concurrent searches for blocks (default: 20)
--rpc-endpoint value, -r value RPC node address --rpc-endpoint value, -r value RPC node address
--timeout value, -s value Timeout for the operation (default: 10s) --timeout value, -s value Timeout for the operation (default: 10s)
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag --wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag