From 43609dd984a97838075969354aa28aaac68fc12f Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 21 Nov 2024 17:56:55 +0300 Subject: [PATCH] *: migrate to SEARCH with strict equality comparator Close #3670 Signed-off-by: Ekaterina Pavlova --- cli/util/upload_bin.go | 21 +++++++++++++++------ docs/neofs-blockstorage.md | 3 +-- pkg/services/blockfetcher/blockfetcher.go | 11 ++++++++--- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 4c15c1ebc..b41b5592f 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -36,8 +36,9 @@ const ( // except the most recent one are guaranteed to be completed and don't contain gaps. uploadBatchSize = 10000 // Number of objects to search in a batch. If it is larger than uploadBatchSize, - // it may lead to many duplicate uploads. - searchBatchSize = uploadBatchSize + // it may lead to many duplicate uploads. We need to search with EQ filter to + // avoid partially-completed SEARCH responses. + searchBatchSize = 1 // Size of object ID. oidSize = sha256.Size ) @@ -214,8 +215,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID prm := client.PrmObjectSearch{} filters := object.NewSearchFilters() - filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE) - filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT) + if endIndex == startIndex+1 { + filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + } else { + filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT) + } prm.SetFilters(filters) var ( objectIDs []oid.ID @@ -527,8 +532,12 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun if len(additionalFilters) != 0 { filters = additionalFilters[0] } - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + if end == start+1 { + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual) + } else { + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + } prm.SetFilters(filters) var objIDs []oid.ID diff --git a/docs/neofs-blockstorage.md b/docs/neofs-blockstorage.md index 46dfd5502..b251090ef 100644 --- a/docs/neofs-blockstorage.md +++ b/docs/neofs-blockstorage.md @@ -47,8 +47,7 @@ parameter. Depending on the mode, the service either: - Searches for index files by index file attribute and reads block OIDs from index file object-by-object. - - Searches batches of blocks directly by block attribute (the batch size is - configured via `OIDBatchSize` parameter). + - Searches blocks one by one directly by block attribute. Once the OIDs are retrieved, they are immediately redirected to the block downloading routines for further processing. The channel that diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 5889c72c3..3b815dcdb 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -342,7 +342,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { // fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. func (bfs *Service) fetchOIDsBySearch() error { startIndex := bfs.chain.BlockHeight() - batchSize := uint32(bfs.cfg.OIDBatchSize) + //We need to search with EQ filter to avoid partially-completed SEARCH responses. + batchSize := uint32(1) for { select { @@ -351,8 +352,12 @@ func (bfs *Service) fetchOIDsBySearch() error { default: prm := client.PrmObjectSearch{} filters := object.NewSearchFilters() - filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) - filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + if startIndex == startIndex+batchSize-1 { + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + } else { + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + } prm.SetFilters(filters) ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) blockOids, err := bfs.objectSearch(ctx, prm)