mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-22 09:19:08 +00:00
Merge pull request #3735 from nspcc-dev/index-files-put
cli: update `upload-bin` to create index files during block uploading
This commit is contained in:
commit
cb4b21fcf4
3 changed files with 159 additions and 271 deletions
|
@ -86,10 +86,6 @@ func NewCommands() []*cli.Command {
|
|||
Usage: "Number of concurrent searches for blocks",
|
||||
Value: 20,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "skip-blocks-uploading",
|
||||
Usage: "Skip blocks uploading and upload only index files",
|
||||
},
|
||||
&cli.UintFlag{
|
||||
Name: "retries",
|
||||
Usage: "Maximum number of Neo/NeoFS node request retries",
|
||||
|
@ -184,7 +180,7 @@ func NewCommands() []*cli.Command {
|
|||
{
|
||||
Name: "upload-bin",
|
||||
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]",
|
||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--retries <num>] [--debug]",
|
||||
Action: uploadBin,
|
||||
Flags: uploadBinFlags,
|
||||
},
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
||||
"github.com/nspcc-dev/neo-go/cli/options"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
||||
|
@ -32,12 +31,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// Number of objects to upload in a batch. All batches of uploadBatchSize size
|
||||
// except the most recent one are guaranteed to be completed and don't contain gaps.
|
||||
uploadBatchSize = 10000
|
||||
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
|
||||
// it may lead to many duplicate uploads. We need to search with EQ filter to
|
||||
// avoid partially-completed SEARCH responses.
|
||||
// Number of objects to search in a batch. We need to search with EQ filter to
|
||||
// avoid partially-completed SEARCH responses. If EQ search haven't found object
|
||||
// the object will be uploaded one more time which may lead to duplicating objects.
|
||||
// We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees
|
||||
// search results).
|
||||
searchBatchSize = 1
|
||||
// Size of object ID.
|
||||
oidSize = sha256.Size
|
||||
|
@ -79,10 +77,12 @@ func uploadBin(ctx *cli.Context) error {
|
|||
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
||||
containerIDStr := ctx.String("container")
|
||||
attr := ctx.String("block-attribute")
|
||||
numWorkers := ctx.Int("workers")
|
||||
maxParallelSearches := ctx.Int("searchers")
|
||||
maxRetries := int(ctx.Uint("retries"))
|
||||
numWorkers := ctx.Uint("workers")
|
||||
maxParallelSearches := ctx.Uint("searchers")
|
||||
maxRetries := ctx.Uint("retries")
|
||||
debug := ctx.Bool("debug")
|
||||
indexFileSize := ctx.Uint("index-file-size")
|
||||
indexAttrKey := ctx.String("index-attribute")
|
||||
acc, _, err := options.GetAccFromContext(ctx)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||
|
@ -151,30 +151,20 @@ func uploadBin(ctx *cli.Context) error {
|
|||
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
|
||||
}
|
||||
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
||||
|
||||
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches, maxRetries)
|
||||
if errBlock != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", errBlock), 1)
|
||||
}
|
||||
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
||||
|
||||
if !ctx.Bool("skip-blocks-uploading") {
|
||||
err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
||||
}
|
||||
oldestMissingBlockIndex = int(currentBlockHeight) + 1
|
||||
}
|
||||
|
||||
err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
|
||||
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
||||
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
|
||||
}
|
||||
|
||||
err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, acc, attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload objects: %w", err), 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// retry function with exponential backoff.
|
||||
func retry(action func() error, maxRetries int) error {
|
||||
func retry(action func() error, maxRetries uint) error {
|
||||
var err error
|
||||
backoff := initialBackoff
|
||||
for range maxRetries {
|
||||
|
@ -190,98 +180,33 @@ func retry(action func() error, maxRetries int) error {
|
|||
return err // Return the last error after exhausting retries.
|
||||
}
|
||||
|
||||
type searchResult struct {
|
||||
startIndex int
|
||||
endIndex int
|
||||
numOIDs int
|
||||
err error
|
||||
}
|
||||
|
||||
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
|
||||
// starting from the currentHeight and going backwards. It returns the index of first block
|
||||
// in the next batch.
|
||||
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches, maxRetries int) (int, error) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
numBatches = currentHeight / searchBatchSize
|
||||
emptyBatchFound bool
|
||||
pWrapper = poolWrapper{p}
|
||||
)
|
||||
|
||||
for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
|
||||
results := make([]searchResult, maxParallelSearches)
|
||||
|
||||
for i := range maxParallelSearches {
|
||||
startIndex := (batch + i) * searchBatchSize
|
||||
endIndex := startIndex + searchBatchSize
|
||||
if endIndex <= 0 {
|
||||
continue
|
||||
}
|
||||
if startIndex < 0 {
|
||||
startIndex = 0
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(i, startIndex, endIndex int) {
|
||||
defer wg.Done()
|
||||
|
||||
prm := client.PrmObjectSearch{}
|
||||
filters := object.NewSearchFilters()
|
||||
if endIndex == startIndex+1 {
|
||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
|
||||
} else {
|
||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
|
||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
|
||||
}
|
||||
prm.SetFilters(filters)
|
||||
var (
|
||||
objectIDs []oid.ID
|
||||
err error
|
||||
)
|
||||
err = retry(func() error {
|
||||
objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm)
|
||||
return err
|
||||
}, maxRetries)
|
||||
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
|
||||
}(i, startIndex, endIndex)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := len(results) - 1; i >= 0; i-- {
|
||||
if results[i].err != nil {
|
||||
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
|
||||
}
|
||||
if results[i].numOIDs == 0 {
|
||||
emptyBatchFound = true
|
||||
continue
|
||||
}
|
||||
if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
|
||||
return results[i].startIndex / uploadBatchSize * uploadBatchSize, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// uploadBlocks uploads the blocks to the container using the pool.
|
||||
func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
|
||||
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
||||
// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool.
|
||||
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries uint, debug bool) error {
|
||||
if currentIndexFileID*indexFileSize >= currentBlockHeight {
|
||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight)
|
||||
return nil
|
||||
}
|
||||
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += uploadBatchSize {
|
||||
fmt.Fprintln(ctx.App.Writer, "Uploading blocks and index files...")
|
||||
for indexFileStart := currentIndexFileID * indexFileSize; indexFileStart < currentBlockHeight; indexFileStart += indexFileSize {
|
||||
var (
|
||||
batchEnd = min(batchStart+uploadBatchSize, int(currentBlockHeight)+1)
|
||||
errCh = make(chan error)
|
||||
doneCh = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
indexFileEnd = min(indexFileStart+indexFileSize, currentBlockHeight)
|
||||
errCh = make(chan error)
|
||||
doneCh = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
emptyOID = make([]byte, oidSize)
|
||||
)
|
||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
||||
wg.Add(numWorkers)
|
||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
|
||||
wg.Add(int(numWorkers))
|
||||
for i := range numWorkers {
|
||||
go func(i int) {
|
||||
go func(i uint) {
|
||||
defer wg.Done()
|
||||
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
||||
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
|
||||
if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 {
|
||||
if debug {
|
||||
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var blk *block.Block
|
||||
errGet := retry(func() error {
|
||||
var errGetBlock error
|
||||
|
@ -316,9 +241,13 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
|||
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
||||
}
|
||||
|
||||
objBytes := bw.Bytes()
|
||||
var (
|
||||
objBytes = bw.Bytes()
|
||||
resOid oid.ID
|
||||
)
|
||||
errRetr := retry(func() error {
|
||||
resOid, errUpload := uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||
var errUpload error
|
||||
resOid, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||
if errUpload != nil {
|
||||
return errUpload
|
||||
}
|
||||
|
@ -334,6 +263,7 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
|||
}
|
||||
return
|
||||
}
|
||||
resOid.Encode(buf[blockIndex%indexFileSize*oidSize:])
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
@ -348,195 +278,151 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
|||
return fmt.Errorf("upload error: %w", err)
|
||||
case <-doneCh:
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)
|
||||
|
||||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
||||
// Additional check for empty OIDs in the buffer.
|
||||
for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize {
|
||||
if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 {
|
||||
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize)
|
||||
}
|
||||
}
|
||||
if indexFileEnd-indexFileStart == indexFileSize {
|
||||
attrs := []object.Attribute{
|
||||
*object.NewAttribute(indexAttributeKey, strconv.Itoa(int(indexFileStart/indexFileSize))),
|
||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||
}
|
||||
err := retry(func() error {
|
||||
var errUpload error
|
||||
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled)
|
||||
return errUpload
|
||||
}, maxRetries)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload index file: %w", err)
|
||||
}
|
||||
fmt.Println("Successfully uploaded index file ", indexFileStart/indexFileSize)
|
||||
}
|
||||
clear(buf)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadIndexFiles uploads missing index files to the container.
|
||||
func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
|
||||
// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
|
||||
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
|
||||
var (
|
||||
attributeKey = ctx.String("index-attribute")
|
||||
indexFileSize = ctx.Uint("index-file-size")
|
||||
// buf is used to store OIDs of the uploaded blocks.
|
||||
buf = make([]byte, indexFileSize*oidSize)
|
||||
doneCh = make(chan struct{})
|
||||
errCh = make(chan error)
|
||||
|
||||
buffer = make([]byte, indexFileSize*oidSize)
|
||||
doneCh = make(chan struct{})
|
||||
errCh = make(chan error)
|
||||
emptyOid = make([]byte, oidSize)
|
||||
|
||||
expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize
|
||||
existingIndexCount = uint(0)
|
||||
filters = object.NewSearchFilters()
|
||||
)
|
||||
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
// Search for existing index files.
|
||||
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
|
||||
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters)
|
||||
for range indexIDs {
|
||||
existingIndexCount++
|
||||
}
|
||||
|
||||
if existingIndexCount >= expectedIndexCount {
|
||||
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
||||
|
||||
// Main processing loop for each index file.
|
||||
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
||||
// Start block parsing goroutines.
|
||||
var (
|
||||
// processedIndices is a mapping from position in buffer to the block index.
|
||||
processedIndices sync.Map
|
||||
wg sync.WaitGroup
|
||||
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
||||
)
|
||||
wg.Add(maxParallelSearches)
|
||||
for range maxParallelSearches {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for id := range oidCh {
|
||||
var obj object.Object
|
||||
errRetr := retry(func() error {
|
||||
var errGet error
|
||||
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
||||
return errGet
|
||||
}, maxRetries)
|
||||
if errRetr != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
pos := uint(blockIndex) % indexFileSize
|
||||
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
||||
id.Encode(buffer[pos*oidSize:])
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Search for blocks within the index file range.
|
||||
startIndex := i * indexFileSize
|
||||
endIndex := startIndex + indexFileSize
|
||||
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, maxRetries, errCh)
|
||||
for id := range objIDs {
|
||||
oidCh <- id
|
||||
}
|
||||
close(oidCh)
|
||||
wg.Wait()
|
||||
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
|
||||
|
||||
// Check if there are empty OIDs in the generated index file. This may happen
|
||||
// if searchObjects has returned not all blocks within the requested range, ref.
|
||||
// #3645. In this case, retry the search for every missing object.
|
||||
var count int
|
||||
for idx := range indexFileSize {
|
||||
if _, ok := processedIndices.Load(idx); !ok {
|
||||
count++
|
||||
fmt.Fprintf(ctx.App.Writer, "Index file %d: fetching missing block %d\n", i, i*indexFileSize+idx)
|
||||
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, maxRetries, errCh)
|
||||
// Block object duplicates are allowed, we're OK with the first found result.
|
||||
id, ok := <-objIDs
|
||||
for range objIDs {
|
||||
}
|
||||
if !ok {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("index file %d: block %d is missing from the storage", i, i*indexFileSize+idx):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
processedIndices.Store(idx, id)
|
||||
id.Encode(buffer[idx*oidSize:])
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)
|
||||
|
||||
// Check if there are empty OIDs in the generated index file. If it happens at
|
||||
// this stage, then there's a bug in the code.
|
||||
for k := 0; k < len(buffer); k += oidSize {
|
||||
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
||||
for i := 0; ; i++ {
|
||||
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters)
|
||||
count := 0
|
||||
for range indexIDs {
|
||||
count++
|
||||
if count > 1 {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
|
||||
case errCh <- fmt.Errorf("duplicated index file %d found", i):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Upload index file.
|
||||
attrs := []object.Attribute{
|
||||
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||
if count == 0 {
|
||||
break
|
||||
}
|
||||
err := retry(func() error {
|
||||
resOid, errUpload := uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
||||
if errUpload != nil {
|
||||
return errUpload
|
||||
}
|
||||
if debug {
|
||||
fmt.Fprintf(ctx.App.Writer, "Uploaded idex file %d with object ID: %s\n", i, resOid.String())
|
||||
}
|
||||
return errUpload
|
||||
}, maxRetries)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
|
||||
clear(buffer)
|
||||
existingIndexCount++
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d\n", existingIndexCount)
|
||||
|
||||
// Start block parsing goroutines.
|
||||
var (
|
||||
// processedIndices is a mapping from position in buffer to the block index.
|
||||
// It prevents duplicates.
|
||||
processedIndices sync.Map
|
||||
wg sync.WaitGroup
|
||||
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
||||
)
|
||||
wg.Add(int(maxParallelSearches))
|
||||
for range maxParallelSearches {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for id := range oidCh {
|
||||
var obj object.Object
|
||||
errRetr := retry(func() error {
|
||||
var errGet error
|
||||
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
||||
return errGet
|
||||
}, maxRetries)
|
||||
if errRetr != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
pos := uint(blockIndex) % indexFileSize
|
||||
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
||||
id.Encode(buf[pos*oidSize:])
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Search for blocks within the index file range.
|
||||
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh)
|
||||
for id := range objIDs {
|
||||
oidCh <- id
|
||||
}
|
||||
close(oidCh)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
return existingIndexCount, nil, err
|
||||
case <-doneCh:
|
||||
return existingIndexCount, buf, nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
||||
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
||||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
var res = make(chan oid.ID, 2*searchBatchSize)
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
defer close(res)
|
||||
|
||||
for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
|
||||
for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches {
|
||||
for j := range maxParallelSearches {
|
||||
start := i + j*searchBatchSize
|
||||
end := start + searchBatchSize
|
||||
|
||||
if start >= int(endIndex) {
|
||||
if start >= endIndex {
|
||||
break
|
||||
}
|
||||
if end > int(endIndex) {
|
||||
end = int(endIndex)
|
||||
if end > endIndex {
|
||||
end = endIndex
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(start, end int) {
|
||||
go func(start, end uint) {
|
||||
defer wg.Done()
|
||||
|
||||
prm := client.PrmObjectSearch{}
|
||||
|
|
|
@ -85,7 +85,7 @@ NAME:
|
|||
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
||||
|
||||
USAGE:
|
||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]
|
||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--retries <num>] [--debug]
|
||||
|
||||
OPTIONS:
|
||||
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
||||
|
@ -96,7 +96,6 @@ OPTIONS:
|
|||
--index-file-size value Size of index file (default: 128000)
|
||||
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
||||
--searchers value Number of concurrent searches for blocks (default: 20)
|
||||
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
|
||||
--retries value Maximum number of Neo/NeoFS node request retries (default: 5)
|
||||
--debug, -d Enable debug logging (LOTS of output, overrides configuration) (default: false)
|
||||
--rpc-endpoint value, -r value RPC node address
|
||||
|
@ -108,11 +107,18 @@ OPTIONS:
|
|||
|
||||
This command works as follows:
|
||||
1. Fetches the current block height from the RPC node.
|
||||
2. Searches for the oldest half-filled batch of block objects stored in NeoFS.
|
||||
3. Fetches missing blocks from the RPC node and uploads them to the NeoFS container
|
||||
starting from the oldest half-filled batch.
|
||||
4. After uploading the blocks, it creates index files for the newly uploaded blocks.
|
||||
5. Uploads the created index files to the NeoFS container.
|
||||
2. Searches for the index files stored in NeoFS.
|
||||
3. Searches for the stored blocks from the latest incomplete index file.
|
||||
4. Fetches missing blocks from the RPC node and uploads them to the NeoFS container.
|
||||
5. After uploading the blocks, it creates index file based on the uploaded block OIDs.
|
||||
6. Uploads the created index file to the NeoFS container.
|
||||
7. Repeats steps 4-6 until the current block height is reached.
|
||||
|
||||
If the command is interrupted, it can be resumed. It starts the uploading process
|
||||
from the oldest half-filled batch of blocks.
|
||||
from the last uploaded index file.
|
||||
|
||||
For a given block sequence, only one type of index file is supported. If new index
|
||||
files are needed (different `index-file-size` or `index-attribute`), the entire
|
||||
block sequence must be uploaded from the beginning. Please, add a comment to the
|
||||
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
|
||||
functionality.
|
Loading…
Reference in a new issue