*: migrate to SEARCH with strict equality comparator

Close #3670

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-21 17:56:55 +03:00
parent 4b6db2dc33
commit c1311871d6
2 changed files with 22 additions and 7 deletions

View file

@ -38,7 +38,8 @@ const (
uploadBatchSize = 10000
// Number of objects to search in a batch.
// If it is larger than uploadBatchSize, it may lead to many duplicate uploads.
searchBatchSize = uploadBatchSize
// We need to search with EQ filter to avoid partially-completed SEARCH responses.
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)
@ -215,8 +216,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
if endIndex == startIndex+1 {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
}
prm.SetFilters(filters)
var (
objectIDs []oid.ID
@ -528,8 +533,12 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
if len(additionalFilters) != 0 {
filters = additionalFilters[0]
}
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
if end == start+1 {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
} else {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
}
prm.SetFilters(filters)
var objIDs []oid.ID

View file

@ -179,6 +179,8 @@ func (bfs *Service) oidDownloader() {
var err error
if bfs.cfg.SkipIndexFilesSearch {
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
bfs.cfg.OIDBatchSize = 1
err = bfs.fetchOIDsBySearch()
} else {
err = bfs.fetchOIDsFromIndexFiles()
@ -351,8 +353,12 @@ func (bfs *Service) fetchOIDsBySearch() error {
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
if startIndex == startIndex+batchSize-1 {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
}
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)