cli: add flag for skipping block uploading in upload-bin

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-10-30 12:08:40 +03:00
parent a4633ce2c7
commit 6a93f70728
3 changed files with 100 additions and 82 deletions

View file

@ -86,6 +86,10 @@ func NewCommands() []*cli.Command {
Usage: "Number of concurrent searches for blocks", Usage: "Number of concurrent searches for blocks",
Value: 20, Value: 20,
}, },
&cli.BoolFlag{
Name: "skip-blocks-uploading",
Usage: "Skip blocks uploading and upload only index files",
},
}, options.RPC...) }, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...) uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{ return []*cli.Command{
@ -168,7 +172,7 @@ func NewCommands() []*cli.Command {
{ {
Name: "upload-bin", Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container", Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]", UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading]",
Action: uploadBin, Action: uploadBin,
Flags: uploadBinFlags, Flags: uploadBinFlags,
}, },

View file

@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
@ -134,89 +135,17 @@ func uploadBin(ctx *cli.Context) error {
} }
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches) oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
if errBlock != nil {
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", err), 1)
}
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
if !ctx.Bool("skip-blocks-uploading") {
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1) return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
} }
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", lastMissingBlockIndex)
if lastMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", lastMissingBlockIndex, currentBlockHeight)
return nil
}
for batchStart := lastMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
var (
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
errorCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
wg.Add(numWorkers)
for i := range numWorkers {
go func(i int) {
defer wg.Done()
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
var blk *block.Block
errGet := retry(func() error {
var errGetBlock error
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
if errGetBlock != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
}
return nil
})
if errGet != nil {
select {
case errorCh <- errGet:
default:
}
return
}
bw := io.NewBufBinWriter()
blk.EncodeBinary(bw.BinWriter)
if bw.Err != nil {
errorCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err)
return
}
attrs := []object.Attribute{
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
*object.NewAttribute("Hash", blk.Hash().StringLE()),
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
}
objBytes := bw.Bytes()
errRetr := retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
})
if errRetr != nil {
select {
case errorCh <- errRetr:
default:
}
return
}
}
}(i)
}
go func() {
wg.Wait()
close(doneCh)
}()
select {
case err = <-errorCh:
return cli.Exit(fmt.Errorf("upload error: %w", err), 1)
case <-doneCh:
}
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
} }
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches) err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
@ -310,6 +239,90 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
return 0, nil return 0, nil
} }
// uploadBlocks uploads the blocks to the container using the pool.
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error {
if oldestMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
return nil
}
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
var (
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
errCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
wg.Add(numWorkers)
for i := range numWorkers {
go func(i int) {
defer wg.Done()
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
var blk *block.Block
errGet := retry(func() error {
var errGetBlock error
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
if errGetBlock != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
}
return nil
})
if errGet != nil {
select {
case errCh <- errGet:
default:
}
return
}
bw := io.NewBufBinWriter()
blk.EncodeBinary(bw.BinWriter)
if bw.Err != nil {
select {
case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err):
default:
}
return
}
attrs := []object.Attribute{
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
*object.NewAttribute("Hash", blk.Hash().StringLE()),
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
}
objBytes := bw.Bytes()
errRetr := retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
})
if errRetr != nil {
select {
case errCh <- errRetr:
default:
}
return
}
}
}(i)
}
go func() {
wg.Wait()
close(doneCh)
}()
select {
case err := <-errCh:
return fmt.Errorf("upload error: %w", err)
case <-doneCh:
}
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
}
return nil
}
// updateIndexFiles updates the index files in the container. // updateIndexFiles updates the index files in the container.
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error { func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
attributeKey := ctx.String("index-attribute") attributeKey := ctx.String("index-attribute")

View file

@ -87,7 +87,7 @@ NAME:
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
USAGE: USAGE:
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading]
OPTIONS: OPTIONS:
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags) --fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
@ -98,6 +98,7 @@ OPTIONS:
--index-file-size value Size of index file (default: 128000) --index-file-size value Size of index file (default: 128000)
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50) --workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
--searchers value Number of concurrent searches for blocks (default: 20) --searchers value Number of concurrent searches for blocks (default: 20)
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
--rpc-endpoint value, -r value RPC node address --rpc-endpoint value, -r value RPC node address
--timeout value, -s value Timeout for the operation (default: 10s) --timeout value, -s value Timeout for the operation (default: 10s)
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag --wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag