diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 495912f83..4c15c1ebc 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -339,41 +339,35 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer // uploadIndexFiles uploads missing index files to the container. func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error { - attributeKey := ctx.String("index-attribute") - indexFileSize := ctx.Uint("index-file-size") - fmt.Fprintln(ctx.App.Writer, "Uploading index files...") - - prm := client.PrmObjectSearch{} - filters := object.NewSearchFilters() - filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) - filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) - prm.SetFilters(filters) - var objectIDs []oid.ID - errSearch := retry(func() error { - var errSearchIndex error - objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }, maxRetries) - if errSearch != nil { - return fmt.Errorf("index files search failed: %w", errSearch) - } - - existingIndexCount := uint(len(objectIDs)) - expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize - if existingIndexCount >= expectedIndexCount { - fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) - return nil - } - fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount) var ( + attributeKey = ctx.String("index-attribute") + indexFileSize = ctx.Uint("index-file-size") + buffer = make([]byte, indexFileSize*oidSize) doneCh = make(chan struct{}) errCh = make(chan error) emptyOid = make([]byte, oidSize) + + expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize + existingIndexCount = uint(0) + filters = object.NewSearchFilters() ) + fmt.Fprintln(ctx.App.Writer, "Uploading index files...") go func() { defer close(doneCh) + // Search for existing index files. + filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) + indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters) + for range indexIDs { + existingIndexCount++ + } + + if existingIndexCount >= expectedIndexCount { + fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) + return + } + fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount) // Main processing loop for each index file. for i := existingIndexCount; i < expectedIndexCount; i++ { @@ -506,7 +500,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun // searchObjects searches in parallel for objects with attribute GE startIndex and LT // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. -func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error) chan oid.ID { +func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { var res = make(chan oid.ID, 2*searchBatchSize) go func() { var wg sync.WaitGroup @@ -530,6 +524,9 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun prm := client.PrmObjectSearch{} filters := object.NewSearchFilters() + if len(additionalFilters) != 0 { + filters = additionalFilters[0] + } filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) prm.SetFilters(filters)