From fe0164ef47029250ad23e8dd55734df1a27bd47f Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Tue, 19 Nov 2024 13:37:42 +0300 Subject: [PATCH] *: migrate to SEARCH with strict equality comparator Depends on https://github.com/nspcc-dev/neo-go/issues/3645. It should be reverted once the issue is resolved. Close #3670 Signed-off-by: Ekaterina Pavlova --- cli/util/upload_bin.go | 57 ++++++++++++----------- pkg/services/blockfetcher/blockfetcher.go | 3 ++ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index dfe599637..f49584db4 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -36,7 +36,8 @@ const ( uploadBatchSize = 10000 // Number of objects to search in a batch. // For searching the oldest missing block it should be equal to uploadBatchSize. - searchBatchSize = uploadBatchSize + // But due to incomplete search results #3645 the object.MatchStringEqual is used. + searchBatchSize = 1 // Size of object ID. oidSize = sha256.Size ) @@ -178,10 +179,10 @@ func retry(action func() error, maxRetries int) error { } type searchResult struct { - startIndex int - endIndex int - numOIDs int - err error + batchStartIndex int + index int + numOIDs int + err error } // fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks @@ -208,13 +209,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID } wg.Add(1) - go func(i, startIndex, endIndex int) { + go func(i, startIndex int) { defer wg.Done() prm := client.PrmObjectSearch{} filters := object.NewSearchFilters() - filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE) - filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT) + filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) prm.SetFilters(filters) var ( objectIDs []oid.ID @@ -224,21 +224,21 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm) return err }, maxRetries) - results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err} - }(i, startIndex, endIndex) + results[i] = searchResult{batchStartIndex: startIndex / uploadBatchSize, index: startIndex, numOIDs: len(objectIDs), err: err} + }(i, startIndex) } wg.Wait() for i := len(results) - 1; i >= 0; i-- { if results[i].err != nil { - return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err) + return 0, fmt.Errorf("blocks search failed for batch starting with index %d, block index %d: %w", results[i].batchStartIndex, results[i].index, results[i].err) } if results[i].numOIDs == 0 { emptyBatchFound = true continue } if emptyBatchFound || (batch == numBatches && i == len(results)-1) { - return results[i].startIndex, nil + return results[i].batchStartIndex * uploadBatchSize, nil } } } @@ -341,24 +341,29 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun attributeKey := ctx.String("index-attribute") indexFileSize := ctx.Uint("index-file-size") fmt.Fprintln(ctx.App.Writer, "Uploading index files...") - - prm := client.PrmObjectSearch{} - filters := object.NewSearchFilters() - filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) - filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) - prm.SetFilters(filters) + expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize var objectIDs []oid.ID - errSearch := retry(func() error { - var errSearchIndex error - objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }, maxRetries) - if errSearch != nil { - return fmt.Errorf("index files search failed: %w", errSearch) + for i := range expectedIndexCount { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(attributeKey, fmt.Sprintf("%d", i), object.MatchStringEqual) + filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) + prm.SetFilters(filters) + var resOIDs []oid.ID + errSearch := retry(func() error { + var errSearchIndex error + resOIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) + return errSearchIndex + }, maxRetries) + if errSearch != nil { + return fmt.Errorf("index files search failed: %w", errSearch) + } + if len(resOIDs) != 0 { + objectIDs = append(objectIDs, resOIDs[0]) + } } existingIndexCount := uint(len(objectIDs)) - expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize if existingIndexCount >= expectedIndexCount { fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) return nil diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 5889c72c3..9534d2041 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -179,6 +179,9 @@ func (bfs *Service) oidDownloader() { var err error if bfs.cfg.SkipIndexFilesSearch { + //Depends on https://github.com/nspcc-dev/neo-go/issues/3645. + //We need to search with EQ filter to prevent not full results. + bfs.cfg.OIDBatchSize = 1 err = bfs.fetchOIDsBySearch() } else { err = bfs.fetchOIDsFromIndexFiles()