*: migrate to SEARCH with strict equality comparator

Close #3670

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-21 17:56:55 +03:00
parent 616c805ac5
commit 43609dd984
3 changed files with 24 additions and 11 deletions

View file

@ -36,8 +36,9 @@ const (
// except the most recent one are guaranteed to be completed and don't contain gaps.
uploadBatchSize = 10000
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
// it may lead to many duplicate uploads.
searchBatchSize = uploadBatchSize
// it may lead to many duplicate uploads. We need to search with EQ filter to
// avoid partially-completed SEARCH responses.
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)
@ -214,8 +215,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
if endIndex == startIndex+1 {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
}
prm.SetFilters(filters)
var (
objectIDs []oid.ID
@ -527,8 +532,12 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
if len(additionalFilters) != 0 {
filters = additionalFilters[0]
}
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
if end == start+1 {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
} else {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
}
prm.SetFilters(filters)
var objIDs []oid.ID

View file

@ -47,8 +47,7 @@ parameter.
Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index
file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is
configured via `OIDBatchSize` parameter).
- Searches blocks one by one directly by block attribute.
Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that

View file

@ -342,7 +342,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects.
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
batchSize := uint32(bfs.cfg.OIDBatchSize)
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
batchSize := uint32(1)
for {
select {
@ -351,8 +352,12 @@ func (bfs *Service) fetchOIDsBySearch() error {
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
if startIndex == startIndex+batchSize-1 {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
}
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)