mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-23 03:41:34 +00:00
Merge pull request #3650 from nspcc-dev/index-files
cli: add retries if search results are not expected length in `upload-bin`
This commit is contained in:
commit
21d6887969
3 changed files with 294 additions and 197 deletions
|
@ -86,6 +86,10 @@ func NewCommands() []*cli.Command {
|
||||||
Usage: "Number of concurrent searches for blocks",
|
Usage: "Number of concurrent searches for blocks",
|
||||||
Value: 20,
|
Value: 20,
|
||||||
},
|
},
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "skip-blocks-uploading",
|
||||||
|
Usage: "Skip blocks uploading and upload only index files",
|
||||||
|
},
|
||||||
}, options.RPC...)
|
}, options.RPC...)
|
||||||
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
|
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
|
||||||
return []*cli.Command{
|
return []*cli.Command{
|
||||||
|
@ -168,7 +172,7 @@ func NewCommands() []*cli.Command {
|
||||||
{
|
{
|
||||||
Name: "upload-bin",
|
Name: "upload-bin",
|
||||||
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
||||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]",
|
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading]",
|
||||||
Action: uploadBin,
|
Action: uploadBin,
|
||||||
Flags: uploadBinFlags,
|
Flags: uploadBinFlags,
|
||||||
},
|
},
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
|
@ -109,6 +110,7 @@ func uploadBin(ctx *cli.Context) error {
|
||||||
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
|
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
|
||||||
}
|
}
|
||||||
homomorphicHashingDisabled := net.HomomorphicHashingDisabled()
|
homomorphicHashingDisabled := net.HomomorphicHashingDisabled()
|
||||||
|
|
||||||
var containerObj container.Container
|
var containerObj container.Container
|
||||||
err = retry(func() error {
|
err = retry(func() error {
|
||||||
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
|
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
|
||||||
|
@ -125,7 +127,7 @@ func uploadBin(ctx *cli.Context) error {
|
||||||
}
|
}
|
||||||
magic := strconv.Itoa(int(v.Protocol.Network))
|
magic := strconv.Itoa(int(v.Protocol.Network))
|
||||||
if containerMagic != magic {
|
if containerMagic != magic {
|
||||||
return cli.Exit(fmt.Sprintf("Container magic %s does not match the network magic %s", containerMagic, magic), 1)
|
return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentBlockHeight, err := rpc.GetBlockCount()
|
currentBlockHeight, err := rpc.GetBlockCount()
|
||||||
|
@ -134,94 +136,22 @@ func uploadBin(ctx *cli.Context) error {
|
||||||
}
|
}
|
||||||
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
||||||
|
|
||||||
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
|
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
|
||||||
if err != nil {
|
if errBlock != nil {
|
||||||
return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1)
|
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", err), 1)
|
||||||
}
|
}
|
||||||
|
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
||||||
|
|
||||||
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", lastMissingBlockIndex)
|
if !ctx.Bool("skip-blocks-uploading") {
|
||||||
|
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers)
|
||||||
if lastMissingBlockIndex > int(currentBlockHeight) {
|
if err != nil {
|
||||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", lastMissingBlockIndex, currentBlockHeight)
|
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for batchStart := lastMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
|
|
||||||
var (
|
|
||||||
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
|
|
||||||
errorCh = make(chan error)
|
|
||||||
doneCh = make(chan struct{})
|
|
||||||
wg sync.WaitGroup
|
|
||||||
)
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
|
||||||
wg.Add(numWorkers)
|
|
||||||
for i := range numWorkers {
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
|
||||||
var blk *block.Block
|
|
||||||
errGet := retry(func() error {
|
|
||||||
var errGetBlock error
|
|
||||||
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
|
|
||||||
if errGetBlock != nil {
|
|
||||||
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if errGet != nil {
|
|
||||||
select {
|
|
||||||
case errorCh <- errGet:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
bw := io.NewBufBinWriter()
|
|
||||||
blk.EncodeBinary(bw.BinWriter)
|
|
||||||
if bw.Err != nil {
|
|
||||||
errorCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
attrs := []object.Attribute{
|
|
||||||
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
|
|
||||||
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
|
|
||||||
*object.NewAttribute("Hash", blk.Hash().StringLE()),
|
|
||||||
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
|
|
||||||
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
|
||||||
}
|
|
||||||
|
|
||||||
objBytes := bw.Bytes()
|
|
||||||
errRetr := retry(func() error {
|
|
||||||
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
|
||||||
})
|
|
||||||
if errRetr != nil {
|
|
||||||
select {
|
|
||||||
case errorCh <- errRetr:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
close(doneCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err = <-errorCh:
|
|
||||||
return cli.Exit(fmt.Errorf("upload error: %w", err), 1)
|
|
||||||
case <-doneCh:
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
|
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1)
|
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -250,8 +180,9 @@ type searchResult struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchLatestMissingBlockIndex searches the container for the last full block batch,
|
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
|
||||||
// starting from the currentHeight and going backwards.
|
// starting from the currentHeight and going backwards. It returns the index of first block
|
||||||
|
// in the next batch.
|
||||||
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
|
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -296,7 +227,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
||||||
|
|
||||||
for i := len(results) - 1; i >= 0; i-- {
|
for i := len(results) - 1; i >= 0; i-- {
|
||||||
if results[i].err != nil {
|
if results[i].err != nil {
|
||||||
return 0, fmt.Errorf("search of index files failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
|
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
|
||||||
}
|
}
|
||||||
if results[i].numOIDs < searchBatchSize {
|
if results[i].numOIDs < searchBatchSize {
|
||||||
emptyBatchFound = true
|
emptyBatchFound = true
|
||||||
|
@ -310,11 +241,95 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateIndexFiles updates the index files in the container.
|
// uploadBlocks uploads the blocks to the container using the pool.
|
||||||
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
|
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error {
|
||||||
|
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
|
||||||
|
var (
|
||||||
|
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
|
||||||
|
errCh = make(chan error)
|
||||||
|
doneCh = make(chan struct{})
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
||||||
|
wg.Add(numWorkers)
|
||||||
|
for i := range numWorkers {
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
||||||
|
var blk *block.Block
|
||||||
|
errGet := retry(func() error {
|
||||||
|
var errGetBlock error
|
||||||
|
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
|
||||||
|
if errGetBlock != nil {
|
||||||
|
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if errGet != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- errGet:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bw := io.NewBufBinWriter()
|
||||||
|
blk.EncodeBinary(bw.BinWriter)
|
||||||
|
if bw.Err != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
attrs := []object.Attribute{
|
||||||
|
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
|
||||||
|
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
|
||||||
|
*object.NewAttribute("Hash", blk.Hash().StringLE()),
|
||||||
|
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
|
||||||
|
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
||||||
|
}
|
||||||
|
|
||||||
|
objBytes := bw.Bytes()
|
||||||
|
errRetr := retry(func() error {
|
||||||
|
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||||
|
})
|
||||||
|
if errRetr != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- errRetr:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return fmt.Errorf("upload error: %w", err)
|
||||||
|
case <-doneCh:
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadIndexFiles uploads missing index files to the container.
|
||||||
|
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
|
||||||
attributeKey := ctx.String("index-attribute")
|
attributeKey := ctx.String("index-attribute")
|
||||||
indexFileSize := ctx.Uint("index-file-size")
|
indexFileSize := ctx.Uint("index-file-size")
|
||||||
fmt.Fprintln(ctx.App.Writer, "Updating index files...")
|
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
|
||||||
|
|
||||||
prm := client.PrmObjectSearch{}
|
prm := client.PrmObjectSearch{}
|
||||||
filters := object.NewSearchFilters()
|
filters := object.NewSearchFilters()
|
||||||
|
@ -328,129 +343,206 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
||||||
return errSearchIndex
|
return errSearchIndex
|
||||||
})
|
})
|
||||||
if errSearch != nil {
|
if errSearch != nil {
|
||||||
return fmt.Errorf("search of index files failed: %w", errSearch)
|
return fmt.Errorf("index files search failed: %w", errSearch)
|
||||||
}
|
}
|
||||||
|
|
||||||
existingIndexCount := uint(len(objectIDs))
|
existingIndexCount := uint(len(objectIDs))
|
||||||
expectedIndexCount := currentHeight / indexFileSize
|
expectedIndexCount := currentHeight / indexFileSize
|
||||||
|
|
||||||
if existingIndexCount >= expectedIndexCount {
|
if existingIndexCount >= expectedIndexCount {
|
||||||
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errCh = make(chan error)
|
buffer = make([]byte, indexFileSize*oidSize)
|
||||||
buffer = make([]byte, indexFileSize*oidSize)
|
doneCh = make(chan struct{})
|
||||||
oidCh = make(chan oid.ID, indexFileSize)
|
errCh = make(chan error)
|
||||||
oidFetcherToProcessor = make(chan struct{}, indexFileSize)
|
|
||||||
|
|
||||||
emptyOid = make([]byte, oidSize)
|
emptyOid = make([]byte, oidSize)
|
||||||
)
|
)
|
||||||
defer close(oidCh)
|
|
||||||
for range maxParallelSearches {
|
go func() {
|
||||||
go func() {
|
defer close(doneCh)
|
||||||
for id := range oidCh {
|
|
||||||
var obj *object.Object
|
// Main processing loop for each index file.
|
||||||
errRetr := retry(func() error {
|
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
||||||
var errGetHead error
|
// Start block parsing goroutines.
|
||||||
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
|
var (
|
||||||
return errGetHead
|
// processedIndices is a mapping from position in buffer to the block index.
|
||||||
})
|
processedIndices sync.Map
|
||||||
if errRetr != nil {
|
wg sync.WaitGroup
|
||||||
select {
|
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
||||||
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
)
|
||||||
default:
|
wg.Add(maxParallelSearches)
|
||||||
|
for range maxParallelSearches {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for id := range oidCh {
|
||||||
|
var obj object.Object
|
||||||
|
errRetr := retry(func() error {
|
||||||
|
var errGet error
|
||||||
|
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
||||||
|
return errGet
|
||||||
|
})
|
||||||
|
if errRetr != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pos := uint(blockIndex) % indexFileSize
|
||||||
|
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
||||||
|
id.Encode(buffer[pos*oidSize:])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
}()
|
||||||
}
|
|
||||||
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
offset := (uint(blockIndex) % indexFileSize) * oidSize
|
|
||||||
id.Encode(buffer[offset:])
|
|
||||||
oidFetcherToProcessor <- struct{}{}
|
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
// Search for blocks within the index file range.
|
||||||
|
startIndex := i * indexFileSize
|
||||||
|
endIndex := startIndex + indexFileSize
|
||||||
|
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh)
|
||||||
|
for id := range objIDs {
|
||||||
|
oidCh <- id
|
||||||
|
}
|
||||||
|
close(oidCh)
|
||||||
|
wg.Wait()
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
|
||||||
|
|
||||||
|
// Check if there are empty OIDs in the generated index file. This may happen
|
||||||
|
// if searchObjects has returned not all blocks within the requested range, ref.
|
||||||
|
// #3645. In this case, retry the search for every missing object.
|
||||||
|
var count int
|
||||||
|
|
||||||
|
for idx := range indexFileSize {
|
||||||
|
if _, ok := processedIndices.Load(idx); !ok {
|
||||||
|
count++
|
||||||
|
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh)
|
||||||
|
// Block object duplicates are allowed, we're OK with the first found result.
|
||||||
|
id, ok := <-objIDs
|
||||||
|
for range objIDs {
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processedIndices.Store(idx, id)
|
||||||
|
id.Encode(buffer[idx*oidSize:])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)
|
||||||
|
|
||||||
|
// Check if there are empty OIDs in the generated index file. If it happens at
|
||||||
|
// this stage, then there's a bug in the code.
|
||||||
|
for k := 0; k < len(buffer); k += oidSize {
|
||||||
|
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload index file.
|
||||||
|
attrs := []object.Attribute{
|
||||||
|
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
||||||
|
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||||
|
}
|
||||||
|
err := retry(func() error {
|
||||||
|
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
|
||||||
|
clear(buffer)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return err
|
||||||
|
case <-doneCh:
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
|
||||||
startIndex := i * indexFileSize
|
|
||||||
endIndex := startIndex + indexFileSize
|
|
||||||
go func() {
|
|
||||||
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
|
|
||||||
remaining := int(endIndex) - j
|
|
||||||
end := j + min(searchBatchSize, remaining)
|
|
||||||
|
|
||||||
prm = client.PrmObjectSearch{}
|
|
||||||
filters = object.NewSearchFilters()
|
|
||||||
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
|
|
||||||
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
var objIDs []oid.ID
|
|
||||||
err := retry(func() error {
|
|
||||||
var errSearchIndex error
|
|
||||||
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
|
|
||||||
return errSearchIndex
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, id := range objIDs {
|
|
||||||
oidCh <- id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var completed int
|
|
||||||
waitLoop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case err := <-errCh:
|
|
||||||
return err
|
|
||||||
case <-oidFetcherToProcessor:
|
|
||||||
completed++
|
|
||||||
if completed == int(indexFileSize) {
|
|
||||||
break waitLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Check if there are any empty oids in the created index file.
|
|
||||||
// This could happen if object payload is empty ->
|
|
||||||
// attribute is not set correctly -> empty oid is added to the index file.
|
|
||||||
for k := 0; k < len(buffer); k += oidSize {
|
|
||||||
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
|
||||||
return fmt.Errorf("empty oid found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
attrs := []object.Attribute{
|
|
||||||
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
|
||||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
|
||||||
}
|
|
||||||
err := retry(func() error {
|
|
||||||
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to upload index file %d: %w", i, err)
|
|
||||||
}
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadObj uploads the block to the container using the pool.
|
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||||
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error {
|
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
||||||
|
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
||||||
|
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID {
|
||||||
|
var res = make(chan oid.ID, 2*searchBatchSize)
|
||||||
|
go func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer close(res)
|
||||||
|
|
||||||
|
for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
|
||||||
|
for j := range maxParallelSearches {
|
||||||
|
start := i + j*searchBatchSize
|
||||||
|
end := start + searchBatchSize
|
||||||
|
|
||||||
|
if start >= int(endIndex) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if end > int(endIndex) {
|
||||||
|
end = int(endIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(start, end int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
prm := client.PrmObjectSearch{}
|
||||||
|
filters := object.NewSearchFilters()
|
||||||
|
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
|
||||||
|
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
|
||||||
|
prm.SetFilters(filters)
|
||||||
|
|
||||||
|
var objIDs []oid.ID
|
||||||
|
err := retry(func() error {
|
||||||
|
var errBlockSearch error
|
||||||
|
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
|
||||||
|
return errBlockSearch
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range objIDs {
|
||||||
|
res <- id
|
||||||
|
}
|
||||||
|
}(start, end)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadObj uploads object to the container using provided settings.
|
||||||
|
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {
|
||||||
var (
|
var (
|
||||||
ownerID user.ID
|
ownerID user.ID
|
||||||
hdr object.Object
|
hdr object.Object
|
||||||
|
@ -469,7 +561,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
|
||||||
hdr.SetCreationEpoch(1)
|
hdr.SetCreationEpoch(1)
|
||||||
v.SetMajor(1)
|
v.SetMajor(1)
|
||||||
hdr.SetVersion(v)
|
hdr.SetVersion(v)
|
||||||
if !HomomorphicHashingDisabled {
|
if !homomorphicHashingDisabled {
|
||||||
checksum.Calculate(&chHomomorphic, checksum.TZ, objData)
|
checksum.Calculate(&chHomomorphic, checksum.TZ, objData)
|
||||||
hdr.SetPayloadHomomorphicHash(chHomomorphic)
|
hdr.SetPayloadHomomorphicHash(chHomomorphic)
|
||||||
}
|
}
|
||||||
|
@ -497,7 +589,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBlockIndex(header *object.Object, attribute string) (int, error) {
|
func getBlockIndex(header object.Object, attribute string) (int, error) {
|
||||||
for _, attr := range header.UserAttributes() {
|
for _, attr := range header.UserAttributes() {
|
||||||
if attr.Key() == attribute {
|
if attr.Key() == attribute {
|
||||||
value := attr.Value()
|
value := attr.Value()
|
||||||
|
|
|
@ -87,7 +87,7 @@ NAME:
|
||||||
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
||||||
|
|
||||||
USAGE:
|
USAGE:
|
||||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]
|
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading]
|
||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
||||||
|
@ -98,6 +98,7 @@ OPTIONS:
|
||||||
--index-file-size value Size of index file (default: 128000)
|
--index-file-size value Size of index file (default: 128000)
|
||||||
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
||||||
--searchers value Number of concurrent searches for blocks (default: 20)
|
--searchers value Number of concurrent searches for blocks (default: 20)
|
||||||
|
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
|
||||||
--rpc-endpoint value, -r value RPC node address
|
--rpc-endpoint value, -r value RPC node address
|
||||||
--timeout value, -s value Timeout for the operation (default: 10s)
|
--timeout value, -s value Timeout for the operation (default: 10s)
|
||||||
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag
|
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag
|
||||||
|
|
Loading…
Reference in a new issue