cli: adjust index file creation in upload-bin

In case of incomplete search result it will try to find each missed oid
and process it. In case of duplicates the first found will be in index
file.

Close #3647

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-01 19:50:05 +03:00
parent 365bbe08ed
commit 35d12779d6

View file

@ -354,115 +354,193 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
} }
var ( var (
errCh = make(chan error) buffer = make([]byte, indexFileSize*oidSize)
buffer = make([]byte, indexFileSize*oidSize) doneCh = make(chan struct{})
oidCh = make(chan oid.ID, indexFileSize) errCh = make(chan error)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)
emptyOid = make([]byte, oidSize) emptyOid = make([]byte, oidSize)
) )
defer close(oidCh)
for range maxParallelSearches { go func() {
go func() { defer close(doneCh)
for id := range oidCh {
var obj *object.Object // Main processing loop for each index file.
errRetr := retry(func() error { for i := existingIndexCount; i < expectedIndexCount; i++ {
var errGetHead error // Start block parsing goroutines.
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) var (
return errGetHead // processedIndices is a mapping from position in buffer to the block index.
}) processedIndices sync.Map
if errRetr != nil { wg sync.WaitGroup
oidCh = make(chan oid.ID, 2*maxParallelSearches)
)
wg.Add(maxParallelSearches)
for range maxParallelSearches {
go func() {
defer wg.Done()
for id := range oidCh {
var obj *object.Object
errRetr := retry(func() error {
var errGetHead error
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
})
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default:
}
return
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default:
}
return
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buffer[pos*oidSize:])
}
}
}()
}
// Search for blocks within the index file range.
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh)
for id := range objIDs {
oidCh <- id
}
close(oidCh)
wg.Wait()
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
// Check if there are empty OIDs in the generated index file. This may happen
// if searchObjects has returned not all blocks within the requested range, ref.
// #3645. In this case, retry the search for every missing object.
var count int
for idx := range indexFileSize {
if _, ok := processedIndices.Load(idx); !ok {
count++
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh)
// Block object duplicates are allowed, we're OK with the first found result.
id, ok := <-objIDs
for range objIDs {
}
if !ok {
select {
case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx):
default:
}
return
}
processedIndices.Store(idx, id)
id.Encode(buffer[idx*oidSize:])
}
}
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)
// Check if there are empty OIDs in the generated index file. If it happens at
// this stage, then there's a bug in the code.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
select { select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
default: default:
} }
return return
} }
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default:
}
return
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
oidFetcherToProcessor <- struct{}{}
} }
}()
}
for i := existingIndexCount; i < expectedIndexCount; i++ { // Upload index file.
startIndex := i * indexFileSize attrs := []object.Attribute{
endIndex := startIndex + indexFileSize *object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
go func() { *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)
prm = client.PrmObjectSearch{}
filters = object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
default:
}
return
}
for _, id := range objIDs {
oidCh <- id
}
} }
}() err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
var completed int })
waitLoop: if err != nil {
for { select {
select { case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
case err := <-errCh: default:
return err
case <-oidFetcherToProcessor:
completed++
if completed == int(indexFileSize) {
break waitLoop
} }
return
} }
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
clear(buffer)
} }
// Check if there are empty OIDs in the generated index file. If it happens at }()
// this stage, then there's a bug in the code.
for k := 0; k < len(buffer); k += oidSize { select {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { case err := <-errCh:
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)) return err
} case <-doneCh:
}
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err)
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
} }
return nil return nil
} }
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
go func() {
var wg sync.WaitGroup
defer close(res)
for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
for j := range maxParallelSearches {
start := i + j*searchBatchSize
end := start + searchBatchSize
if start >= int(endIndex) {
break
}
if end > int(endIndex) {
end = int(endIndex)
}
wg.Add(1)
go func(start, end int) {
defer wg.Done()
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
var objIDs []oid.ID
err := retry(func() error {
var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errBlockSearch
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
default:
}
return
}
for _, id := range objIDs {
res <- id
}
}(start, end)
}
wg.Wait()
}
}()
return res
}
// uploadObj uploads object to the container using provided settings. // uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error { func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {
var ( var (