Merge pull request #3686 from nspcc-dev/fix-mode

cli: fix `skip-blocks-uploading` mode
This commit is contained in:
Anna Shaleva 2024-11-15 17:39:00 +03:00 committed by GitHub
commit 4a96bd1dc1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 37 additions and 17 deletions

View file

@ -101,6 +101,7 @@ func NewCommands() []*cli.Command {
return nil
},
},
options.Debug,
}, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{
@ -183,7 +184,7 @@ func NewCommands() []*cli.Command {
{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>]",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]",
Action: uploadBin,
Flags: uploadBinFlags,
},

View file

@ -66,6 +66,7 @@ func uploadBin(ctx *cli.Context) error {
numWorkers := ctx.Int("workers")
maxParallelSearches := ctx.Int("searchers")
maxRetries := int(ctx.Uint("retries"))
debug := ctx.Bool("debug")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
@ -142,13 +143,14 @@ func uploadBin(ctx *cli.Context) error {
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
if !ctx.Bool("skip-blocks-uploading") {
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries)
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
}
oldestMissingBlockIndex = int(currentBlockHeight) + 1
}
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries)
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
}
@ -241,7 +243,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
}
// uploadBlocks uploads the blocks to the container using the pool.
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int) error {
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
if oldestMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
return nil
@ -295,7 +297,14 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
objBytes := bw.Bytes()
errRetr := retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
resOid, errUpload := uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
if errUpload != nil {
return errUpload
}
if debug {
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
}
return errUpload
}, maxRetries)
if errRetr != nil {
select {
@ -325,7 +334,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
}
// uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int) error {
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
@ -460,7 +469,14 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
resOid, errUpload := uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
if errUpload != nil {
return errUpload
}
if debug {
fmt.Fprintf(ctx.App.Writer, "Uploaded idex file %d with object ID: %s\n", i, resOid.String())
}
return errUpload
}, maxRetries)
if err != nil {
select {
@ -541,7 +557,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
}
// uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
var (
ownerID user.ID
hdr object.Object
@ -549,6 +565,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
chHomomorphic checksum.Checksum
v = new(version.Version)
prmObjectPutInit client.PrmObjectPutInit
resOID = oid.ID{}
)
ownerID.SetScriptHash(owner)
@ -569,31 +586,32 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
err := hdr.SetIDWithSignature(signer)
if err != nil {
return err
return resOID, err
}
err = hdr.CheckHeaderVerificationFields()
if err != nil {
return err
return resOID, err
}
writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
if err != nil {
return fmt.Errorf("failed to initiate object upload: %w", err)
return resOID, fmt.Errorf("failed to initiate object upload: %w", err)
}
_, err = writer.Write(objData)
if err != nil {
_ = writer.Close()
return fmt.Errorf("failed to write object data: %w", err)
return resOID, fmt.Errorf("failed to write object data: %w", err)
}
err = writer.Close()
if err != nil {
return fmt.Errorf("failed to close object writer: %w", err)
return resOID, fmt.Errorf("failed to close object writer: %w", err)
}
res := writer.GetResult()
if res.StoredObjectID().Equals(oid.ID{}) {
return fmt.Errorf("object ID is empty")
resOID = res.StoredObjectID()
if resOID.Equals(oid.ID{}) {
return resOID, fmt.Errorf("object ID is empty")
}
return nil
return resOID, nil
}
func getBlockIndex(header object.Object, attribute string) (int, error) {

View file

@ -87,7 +87,7 @@ NAME:
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
USAGE:
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>]
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]
OPTIONS:
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
@ -100,6 +100,7 @@ OPTIONS:
--searchers value Number of concurrent searches for blocks (default: 20)
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
--retries value Maximum number of Neo/NeoFS node request retries (default: 5)
--debug, -d Enable debug logging (LOTS of output, overrides configuration) (default: false)
--rpc-endpoint value, -r value RPC node address
--timeout value, -s value Timeout for the operation (default: 10s)
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag