Merge pull request #3751 from nspcc-dev/debug-upload-bin

cli: add debug on retry in `upload-bin`
This commit is contained in:
Anna Shaleva 2024-12-13 16:33:43 +03:00 committed by GitHub
commit e0c8bebd05
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -91,7 +91,7 @@ func uploadBin(ctx *cli.Context) error {
var errNet error var errNet error
net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{}) net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
return errNet return errNet
}, maxRetries) }, maxRetries, debug)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1) return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
} }
@ -101,7 +101,7 @@ func uploadBin(ctx *cli.Context) error {
err = retry(func() error { err = retry(func() error {
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{}) containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
return err return err
}, maxRetries) }, maxRetries, debug)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1) return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
} }
@ -121,7 +121,7 @@ func uploadBin(ctx *cli.Context) error {
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
} }
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries) i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1) return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
} }
@ -134,13 +134,16 @@ func uploadBin(ctx *cli.Context) error {
} }
// retry function with exponential backoff. // retry function with exponential backoff.
func retry(action func() error, maxRetries uint) error { func retry(action func() error, maxRetries uint, debug bool) error {
var err error var err error
backoff := neofs.InitialBackoff backoff := neofs.InitialBackoff
for range maxRetries { for i := range maxRetries {
if err = action(); err == nil { if err = action(); err == nil {
return nil // Success, no retry needed. return nil // Success, no retry needed.
} }
if debug {
fmt.Printf("Retry %d: %v\n", i, err)
}
time.Sleep(backoff) // Backoff before retrying. time.Sleep(backoff) // Backoff before retrying.
backoff *= time.Duration(neofs.BackoffFactor) backoff *= time.Duration(neofs.BackoffFactor)
if backoff > neofs.MaxBackoff { if backoff > neofs.MaxBackoff {
@ -185,7 +188,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock) return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
} }
return nil return nil
}, maxRetries) }, maxRetries, debug)
if errGet != nil { if errGet != nil {
select { select {
case errCh <- errGet: case errCh <- errGet:
@ -225,7 +228,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String()) fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
} }
return errUpload return errUpload
}, maxRetries) }, maxRetries, debug)
if errRetr != nil { if errRetr != nil {
select { select {
case errCh <- errRetr: case errCh <- errRetr:
@ -265,7 +268,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
var errUpload error var errUpload error
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled) _, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled)
return errUpload return errUpload
}, maxRetries) }, maxRetries, debug)
if err != nil { if err != nil {
return fmt.Errorf("failed to upload index file: %w", err) return fmt.Errorf("failed to upload index file: %w", err)
} }
@ -277,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
} }
// searchIndexFile returns the ID and buffer for the next index file to be uploaded. // searchIndexFile returns the ID and buffer for the next index file to be uploaded.
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) { func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
var ( var (
// buf is used to store OIDs of the uploaded blocks. // buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*neofs.OIDSize) buf = make([]byte, indexFileSize*neofs.OIDSize)
@ -292,7 +295,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// Search for existing index files. // Search for existing index files.
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
for i := 0; ; i++ { for i := 0; ; i++ {
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters) indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
count := 0 count := 0
for range indexIDs { for range indexIDs {
count++ count++
@ -329,7 +332,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
var errGet error var errGet error
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{}) obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
return errGet return errGet
}, maxRetries) }, maxRetries, debug)
if errRetr != nil { if errRetr != nil {
select { select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
@ -354,7 +357,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
} }
// Search for blocks within the index file range. // Search for blocks within the index file range.
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh) objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
for id := range objIDs { for id := range objIDs {
oidCh <- id oidCh <- id
} }
@ -373,7 +376,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT // searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once // endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way. // OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
go func() { go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -413,7 +416,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou
var errBlockSearch error var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errBlockSearch return errBlockSearch
}, maxRetries) }, maxRetries, debug)
if err != nil { if err != nil {
select { select {
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err): case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):