*: migrate to SEARCH with strict equality comparator

Depends on https://github.com/nspcc-dev/neo-go/issues/3645. It should be
reverted once the issue is resolved.

Close #3670

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-19 13:37:42 +03:00
parent c6f68cae34
commit fe0164ef47
2 changed files with 34 additions and 26 deletions

View file

@ -36,7 +36,8 @@ const (
uploadBatchSize = 10000
// Number of objects to search in a batch.
// For searching the oldest missing block it should be equal to uploadBatchSize.
searchBatchSize = uploadBatchSize
// But due to incomplete search results #3645 the object.MatchStringEqual is used.
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)
@ -178,10 +179,10 @@ func retry(action func() error, maxRetries int) error {
}
type searchResult struct {
startIndex int
endIndex int
numOIDs int
err error
batchStartIndex int
index int
numOIDs int
err error
}
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
@ -208,13 +209,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
}
wg.Add(1)
go func(i, startIndex, endIndex int) {
go func(i, startIndex int) {
defer wg.Done()
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
prm.SetFilters(filters)
var (
objectIDs []oid.ID
@ -224,21 +224,21 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm)
return err
}, maxRetries)
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
}(i, startIndex, endIndex)
results[i] = searchResult{batchStartIndex: startIndex / uploadBatchSize, index: startIndex, numOIDs: len(objectIDs), err: err}
}(i, startIndex)
}
wg.Wait()
for i := len(results) - 1; i >= 0; i-- {
if results[i].err != nil {
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
return 0, fmt.Errorf("blocks search failed for batch starting with index %d, block index %d: %w", results[i].batchStartIndex, results[i].index, results[i].err)
}
if results[i].numOIDs == 0 {
emptyBatchFound = true
continue
}
if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
return results[i].startIndex, nil
return results[i].batchStartIndex * uploadBatchSize, nil
}
}
}
@ -341,24 +341,29 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE)
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters)
expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize
var objectIDs []oid.ID
errSearch := retry(func() error {
var errSearchIndex error
objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
}, maxRetries)
if errSearch != nil {
return fmt.Errorf("index files search failed: %w", errSearch)
for i := range expectedIndexCount {
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", i), object.MatchStringEqual)
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters)
var resOIDs []oid.ID
errSearch := retry(func() error {
var errSearchIndex error
resOIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
}, maxRetries)
if errSearch != nil {
return fmt.Errorf("index files search failed: %w", errSearch)
}
if len(resOIDs) != 0 {
objectIDs = append(objectIDs, resOIDs[0])
}
}
existingIndexCount := uint(len(objectIDs))
expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return nil

View file

@ -179,6 +179,9 @@ func (bfs *Service) oidDownloader() {
var err error
if bfs.cfg.SkipIndexFilesSearch {
//Depends on https://github.com/nspcc-dev/neo-go/issues/3645.
//We need to search with EQ filter to prevent not full results.
bfs.cfg.OIDBatchSize = 1
err = bfs.fetchOIDsBySearch()
} else {
err = bfs.fetchOIDsFromIndexFiles()