Merge pull request #3684 from nspcc-dev/last-block

cli: fix fetchLatestMissingBlockIndex and uploadIndexFiles in `upload-bin` command
This commit is contained in:
Anna Shaleva 2024-11-15 15:12:30 +03:00 committed by GitHub
commit b97d0b2326
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -148,7 +148,7 @@ func uploadBin(ctx *cli.Context) error {
} }
} }
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries) err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries)
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1) return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
} }
@ -185,7 +185,7 @@ type searchResult struct {
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches, maxRetries int) (int, error) { func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches, maxRetries int) (int, error) {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
numBatches = currentHeight/searchBatchSize + 1 numBatches = currentHeight / searchBatchSize
emptyBatchFound bool emptyBatchFound bool
) )
@ -228,12 +228,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
if results[i].err != nil { if results[i].err != nil {
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err) return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
} }
if results[i].numOIDs < searchBatchSize { if results[i].numOIDs == 0 {
emptyBatchFound = true emptyBatchFound = true
continue continue
} }
if emptyBatchFound || (batch == numBatches && i == len(results)-1) { if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
return results[i].startIndex + searchBatchSize, nil return results[i].startIndex, nil
} }
} }
} }
@ -325,7 +325,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
} }
// uploadIndexFiles uploads missing index files to the container. // uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int) error { func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int) error {
attributeKey := ctx.String("index-attribute") attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size") indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...") fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
@ -346,12 +346,12 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
} }
existingIndexCount := uint(len(objectIDs)) existingIndexCount := uint(len(objectIDs))
expectedIndexCount := currentHeight / indexFileSize expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize
if existingIndexCount >= expectedIndexCount { if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return nil return nil
} }
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
var ( var (
buffer = make([]byte, indexFileSize*oidSize) buffer = make([]byte, indexFileSize*oidSize)
doneCh = make(chan struct{}) doneCh = make(chan struct{})