diff --git a/cli/util/uploader.go b/cli/util/uploader.go index 520b04c7b..8ef315ab2 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -354,115 +354,193 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun } var ( - errCh = make(chan error) - buffer = make([]byte, indexFileSize*oidSize) - oidCh = make(chan oid.ID, indexFileSize) - oidFetcherToProcessor = make(chan struct{}, indexFileSize) - + buffer = make([]byte, indexFileSize*oidSize) + doneCh = make(chan struct{}) + errCh = make(chan error) emptyOid = make([]byte, oidSize) ) - defer close(oidCh) - for range maxParallelSearches { - go func() { - for id := range oidCh { - var obj *object.Object - errRetr := retry(func() error { - var errGetHead error - obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) - return errGetHead - }) - if errRetr != nil { + + go func() { + defer close(doneCh) + + // Main processing loop for each index file. + for i := existingIndexCount; i < expectedIndexCount; i++ { + // Start block parsing goroutines. + var ( + // processedIndices is a mapping from position in buffer to the block index. + processedIndices sync.Map + wg sync.WaitGroup + oidCh = make(chan oid.ID, 2*maxParallelSearches) + ) + wg.Add(maxParallelSearches) + for range maxParallelSearches { + go func() { + defer wg.Done() + for id := range oidCh { + var obj *object.Object + errRetr := retry(func() error { + var errGetHead error + obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) + return errGetHead + }) + if errRetr != nil { + select { + case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): + default: + } + return + } + blockIndex, err := getBlockIndex(obj, blockAttributeKey) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): + default: + } + return + } + pos := uint(blockIndex) % indexFileSize + if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { + id.Encode(buffer[pos*oidSize:]) + } + } + }() + } + + // Search for blocks within the index file range. + startIndex := i * indexFileSize + endIndex := startIndex + indexFileSize + objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh) + for id := range objIDs { + oidCh <- id + } + close(oidCh) + wg.Wait() + fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i) + + // Check if there are empty OIDs in the generated index file. This may happen + // if searchObjects has returned not all blocks within the requested range, ref. + // #3645. In this case, retry the search for every missing object. + var count int + + for idx := range indexFileSize { + if _, ok := processedIndices.Load(idx); !ok { + count++ + objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh) + // Block object duplicates are allowed, we're OK with the first found result. + id, ok := <-objIDs + for range objIDs { + } + if !ok { + select { + case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx): + default: + } + return + } + processedIndices.Store(idx, id) + id.Encode(buffer[idx*oidSize:]) + } + } + fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i) + + // Check if there are empty OIDs in the generated index file. If it happens at + // this stage, then there's a bug in the code. + for k := 0; k < len(buffer); k += oidSize { + if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { select { - case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): + case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)): default: } return } - blockIndex, err := getBlockIndex(obj, blockAttributeKey) - if err != nil { - select { - case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): - default: - } - return - } - offset := (uint(blockIndex) % indexFileSize) * oidSize - id.Encode(buffer[offset:]) - oidFetcherToProcessor <- struct{}{} } - }() - } - for i := existingIndexCount; i < expectedIndexCount; i++ { - startIndex := i * indexFileSize - endIndex := startIndex + indexFileSize - go func() { - for j := int(startIndex); j < int(endIndex); j += searchBatchSize { - remaining := int(endIndex) - j - end := j + min(searchBatchSize, remaining) - - prm = client.PrmObjectSearch{} - filters = object.NewSearchFilters() - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) - prm.SetFilters(filters) - var objIDs []oid.ID - err := retry(func() error { - var errSearchIndex error - objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }) - - if err != nil { - select { - case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): - default: - } - return - } - - for _, id := range objIDs { - oidCh <- id - } + // Upload index file. + attrs := []object.Attribute{ + *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), + *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - }() - - var completed int - waitLoop: - for { - select { - case err := <-errCh: - return err - case <-oidFetcherToProcessor: - completed++ - if completed == int(indexFileSize) { - break waitLoop + err := retry(func() error { + return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err): + default: } + return } + fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + clear(buffer) } - // Check if there are empty OIDs in the generated index file. If it happens at - // this stage, then there's a bug in the code. - for k := 0; k < len(buffer); k += oidSize { - if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { - return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)) - } - } - attrs := []object.Attribute{ - *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), - *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), - } - err := retry(func() error { - return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) - }) - if err != nil { - return fmt.Errorf("failed to upload index file %d: %w", i, err) - } - fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + }() + + select { + case err := <-errCh: + return err + case <-doneCh: } return nil } +// searchObjects searches in parallel for objects with attribute GE startIndex and LT +// endIndex. It returns a buffered channel of resulting object IDs and closes it once +// OID search is finished. Errors are sent to errCh in a non-blocking way. +func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID { + var res = make(chan oid.ID, 2*searchBatchSize) + go func() { + var wg sync.WaitGroup + defer close(res) + + for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches { + for j := range maxParallelSearches { + start := i + j*searchBatchSize + end := start + searchBatchSize + + if start >= int(endIndex) { + break + } + if end > int(endIndex) { + end = int(endIndex) + } + + wg.Add(1) + go func(start, end int) { + defer wg.Done() + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + prm.SetFilters(filters) + + var objIDs []oid.ID + err := retry(func() error { + var errBlockSearch error + objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) + return errBlockSearch + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err): + default: + } + return + } + + for _, id := range objIDs { + res <- id + } + }(start, end) + } + wg.Wait() + } + }() + + return res +} + // uploadObj uploads object to the container using provided settings. func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error { var (