Merge pull request #3691 from nspcc-dev/uploader-eq-search

upload-bin, NeoFSBlockFetcher: migrate to SEARCH with strict equality comparator
This commit is contained in:
Anna Shaleva 2024-11-25 13:43:12 +03:00 committed by GitHub
commit 171e01be3c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 56 additions and 42 deletions

View file

@ -32,8 +32,13 @@ import (
) )
const ( const (
// Number of objects to search in a batch for finding max block in container. // Number of objects to upload in a batch. All batches of uploadBatchSize size
searchBatchSize = 10000 // except the most recent one are guaranteed to be completed and don't contain gaps.
uploadBatchSize = 10000
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
// it may lead to many duplicate uploads. We need to search with EQ filter to
// avoid partially-completed SEARCH responses.
searchBatchSize = 1
// Size of object ID. // Size of object ID.
oidSize = sha256.Size oidSize = sha256.Size
) )
@ -138,7 +143,7 @@ func uploadBin(ctx *cli.Context) error {
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches, maxRetries) oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches, maxRetries)
if errBlock != nil { if errBlock != nil {
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", err), 1) return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", errBlock), 1)
} }
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex) fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
@ -210,8 +215,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
prm := client.PrmObjectSearch{} prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE) if endIndex == startIndex+1 {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT) filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
}
prm.SetFilters(filters) prm.SetFilters(filters)
var ( var (
objectIDs []oid.ID objectIDs []oid.ID
@ -235,7 +244,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
continue continue
} }
if emptyBatchFound || (batch == numBatches && i == len(results)-1) { if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
return results[i].startIndex, nil return results[i].startIndex / uploadBatchSize * uploadBatchSize, nil
} }
} }
} }
@ -248,9 +257,9 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight) fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
return nil return nil
} }
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize { for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += uploadBatchSize {
var ( var (
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1) batchEnd = min(batchStart+uploadBatchSize, int(currentBlockHeight)+1)
errCh = make(chan error) errCh = make(chan error)
doneCh = make(chan struct{}) doneCh = make(chan struct{})
wg sync.WaitGroup wg sync.WaitGroup
@ -335,41 +344,35 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
// uploadIndexFiles uploads missing index files to the container. // uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error { func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE)
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters)
var objectIDs []oid.ID
errSearch := retry(func() error {
var errSearchIndex error
objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
}, maxRetries)
if errSearch != nil {
return fmt.Errorf("index files search failed: %w", errSearch)
}
existingIndexCount := uint(len(objectIDs))
expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return nil
}
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
var ( var (
attributeKey = ctx.String("index-attribute")
indexFileSize = ctx.Uint("index-file-size")
buffer = make([]byte, indexFileSize*oidSize) buffer = make([]byte, indexFileSize*oidSize)
doneCh = make(chan struct{}) doneCh = make(chan struct{})
errCh = make(chan error) errCh = make(chan error)
emptyOid = make([]byte, oidSize) emptyOid = make([]byte, oidSize)
expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize
existingIndexCount = uint(0)
filters = object.NewSearchFilters()
) )
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
go func() { go func() {
defer close(doneCh) defer close(doneCh)
// Search for existing index files.
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters)
for range indexIDs {
existingIndexCount++
}
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return
}
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
// Main processing loop for each index file. // Main processing loop for each index file.
for i := existingIndexCount; i < expectedIndexCount; i++ { for i := existingIndexCount; i < expectedIndexCount; i++ {
@ -502,7 +505,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT // searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once // endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way. // OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error) chan oid.ID { func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize) var res = make(chan oid.ID, 2*searchBatchSize)
go func() { go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -526,8 +529,15 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
prm := client.PrmObjectSearch{} prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) if len(additionalFilters) != 0 {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) filters = additionalFilters[0]
}
if end == start+1 {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
} else {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
}
prm.SetFilters(filters) prm.SetFilters(filters)
var objIDs []oid.ID var objIDs []oid.ID

View file

@ -47,8 +47,7 @@ parameter.
Depending on the mode, the service either: Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index - Searches for index files by index file attribute and reads block OIDs from index
file object-by-object. file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is - Searches blocks one by one directly by block attribute.
configured via `OIDBatchSize` parameter).
Once the OIDs are retrieved, they are immediately redirected to the Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that block downloading routines for further processing. The channel that

View file

@ -342,7 +342,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. // fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects.
func (bfs *Service) fetchOIDsBySearch() error { func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight() startIndex := bfs.chain.BlockHeight()
batchSize := uint32(bfs.cfg.OIDBatchSize) //We need to search with EQ filter to avoid partially-completed SEARCH responses.
batchSize := uint32(1)
for { for {
select { select {
@ -351,8 +352,12 @@ func (bfs *Service) fetchOIDsBySearch() error {
default: default:
prm := client.PrmObjectSearch{} prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) if startIndex == startIndex+batchSize-1 {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
}
prm.SetFilters(filters) prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm) blockOids, err := bfs.objectSearch(ctx, prm)