cli: refactor index file search in upload-bin command

Align index file searching logic with block search.

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-21 17:54:25 +03:00
parent 5dab154582
commit 616c805ac5

View file

@ -339,41 +339,35 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
// uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE)
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters)
var objectIDs []oid.ID
errSearch := retry(func() error {
var errSearchIndex error
objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
}, maxRetries)
if errSearch != nil {
return fmt.Errorf("index files search failed: %w", errSearch)
}
existingIndexCount := uint(len(objectIDs))
expectedIndexCount := (oldestMissingBlockIndex - 1) / indexFileSize
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return nil
}
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
var (
attributeKey = ctx.String("index-attribute")
indexFileSize = ctx.Uint("index-file-size")
buffer = make([]byte, indexFileSize*oidSize)
doneCh = make(chan struct{})
errCh = make(chan error)
emptyOid = make([]byte, oidSize)
expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize
existingIndexCount = uint(0)
filters = object.NewSearchFilters()
)
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
go func() {
defer close(doneCh)
// Search for existing index files.
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters)
for range indexIDs {
existingIndexCount++
}
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return
}
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
// Main processing loop for each index file.
for i := existingIndexCount; i < expectedIndexCount; i++ {
@ -506,7 +500,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error) chan oid.ID {
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
go func() {
var wg sync.WaitGroup
@ -530,6 +524,9 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
if len(additionalFilters) != 0 {
filters = additionalFilters[0]
}
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)