From 46cbfab2642a20291dd0016ea1b55bf4860a040e Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Fri, 13 Dec 2024 12:48:04 +0300 Subject: [PATCH] cli: add more debug info about retry to the `upload-bin` Signed-off-by: Ekaterina Pavlova --- cli/util/upload_bin.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 990ce52de..274bcad31 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -91,7 +91,7 @@ func uploadBin(ctx *cli.Context) error { var errNet error net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{}) return errNet - }, maxRetries) + }, maxRetries, debug) if err != nil { return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1) } @@ -101,7 +101,7 @@ func uploadBin(ctx *cli.Context) error { err = retry(func() error { containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{}) return err - }, maxRetries) + }, maxRetries, debug) if err != nil { return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1) } @@ -121,7 +121,7 @@ func uploadBin(ctx *cli.Context) error { return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1) } fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) - i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries) + i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug) if err != nil { return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1) } @@ -134,13 +134,16 @@ func uploadBin(ctx *cli.Context) error { } // retry function with exponential backoff. -func retry(action func() error, maxRetries uint) error { +func retry(action func() error, maxRetries uint, debug bool) error { var err error backoff := neofs.InitialBackoff - for range maxRetries { + for i := range maxRetries { if err = action(); err == nil { return nil // Success, no retry needed. } + if debug { + fmt.Printf("Retry %d: %v\n", i, err) + } time.Sleep(backoff) // Backoff before retrying. backoff *= time.Duration(neofs.BackoffFactor) if backoff > neofs.MaxBackoff { @@ -185,7 +188,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock) } return nil - }, maxRetries) + }, maxRetries, debug) if errGet != nil { select { case errCh <- errGet: @@ -225,7 +228,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String()) } return errUpload - }, maxRetries) + }, maxRetries, debug) if errRetr != nil { select { case errCh <- errRetr: @@ -265,7 +268,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C var errUpload error _, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled) return errUpload - }, maxRetries) + }, maxRetries, debug) if err != nil { return fmt.Errorf("failed to upload index file: %w", err) } @@ -277,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C } // searchIndexFile returns the ID and buffer for the next index file to be uploaded. -func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) { +func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { var ( // buf is used to store OIDs of the uploaded blocks. buf = make([]byte, indexFileSize*neofs.OIDSize) @@ -292,7 +295,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun // Search for existing index files. filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) for i := 0; ; i++ { - indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters) + indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters) count := 0 for range indexIDs { count++ @@ -329,7 +332,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun var errGet error obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{}) return errGet - }, maxRetries) + }, maxRetries, debug) if errRetr != nil { select { case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): @@ -354,7 +357,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun } // Search for blocks within the index file range. - objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh) + objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh) for id := range objIDs { oidCh <- id } @@ -373,7 +376,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun // searchObjects searches in parallel for objects with attribute GE startIndex and LT // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. -func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { +func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) go func() { var wg sync.WaitGroup @@ -413,7 +416,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou var errBlockSearch error objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) return errBlockSearch - }, maxRetries) + }, maxRetries, debug) if err != nil { select { case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):