mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-22 19:19:09 +00:00
cli: update upload-bin
to create index files during block uploading
Close #3655 Close #3652 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
parent
5f92da21fa
commit
62615f8c7e
3 changed files with 159 additions and 271 deletions
|
@ -86,10 +86,6 @@ func NewCommands() []*cli.Command {
|
||||||
Usage: "Number of concurrent searches for blocks",
|
Usage: "Number of concurrent searches for blocks",
|
||||||
Value: 20,
|
Value: 20,
|
||||||
},
|
},
|
||||||
&cli.BoolFlag{
|
|
||||||
Name: "skip-blocks-uploading",
|
|
||||||
Usage: "Skip blocks uploading and upload only index files",
|
|
||||||
},
|
|
||||||
&cli.UintFlag{
|
&cli.UintFlag{
|
||||||
Name: "retries",
|
Name: "retries",
|
||||||
Usage: "Maximum number of Neo/NeoFS node request retries",
|
Usage: "Maximum number of Neo/NeoFS node request retries",
|
||||||
|
@ -184,7 +180,7 @@ func NewCommands() []*cli.Command {
|
||||||
{
|
{
|
||||||
Name: "upload-bin",
|
Name: "upload-bin",
|
||||||
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
|
||||||
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]",
|
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--retries <num>] [--debug]",
|
||||||
Action: uploadBin,
|
Action: uploadBin,
|
||||||
Flags: uploadBinFlags,
|
Flags: uploadBinFlags,
|
||||||
},
|
},
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
||||||
"github.com/nspcc-dev/neo-go/cli/options"
|
"github.com/nspcc-dev/neo-go/cli/options"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
||||||
|
@ -32,12 +31,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Number of objects to upload in a batch. All batches of uploadBatchSize size
|
// Number of objects to search in a batch. We need to search with EQ filter to
|
||||||
// except the most recent one are guaranteed to be completed and don't contain gaps.
|
// avoid partially-completed SEARCH responses. If EQ search haven't found object
|
||||||
uploadBatchSize = 10000
|
// the object will be uploaded one more time which may lead to duplicating objects.
|
||||||
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
|
// We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees
|
||||||
// it may lead to many duplicate uploads. We need to search with EQ filter to
|
// search results).
|
||||||
// avoid partially-completed SEARCH responses.
|
|
||||||
searchBatchSize = 1
|
searchBatchSize = 1
|
||||||
// Size of object ID.
|
// Size of object ID.
|
||||||
oidSize = sha256.Size
|
oidSize = sha256.Size
|
||||||
|
@ -79,10 +77,12 @@ func uploadBin(ctx *cli.Context) error {
|
||||||
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
||||||
containerIDStr := ctx.String("container")
|
containerIDStr := ctx.String("container")
|
||||||
attr := ctx.String("block-attribute")
|
attr := ctx.String("block-attribute")
|
||||||
numWorkers := ctx.Int("workers")
|
numWorkers := ctx.Uint("workers")
|
||||||
maxParallelSearches := ctx.Int("searchers")
|
maxParallelSearches := ctx.Uint("searchers")
|
||||||
maxRetries := int(ctx.Uint("retries"))
|
maxRetries := ctx.Uint("retries")
|
||||||
debug := ctx.Bool("debug")
|
debug := ctx.Bool("debug")
|
||||||
|
indexFileSize := ctx.Uint("index-file-size")
|
||||||
|
indexAttrKey := ctx.String("index-attribute")
|
||||||
acc, _, err := options.GetAccFromContext(ctx)
|
acc, _, err := options.GetAccFromContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
||||||
|
@ -151,30 +151,20 @@ func uploadBin(ctx *cli.Context) error {
|
||||||
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
|
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
|
||||||
}
|
}
|
||||||
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
||||||
|
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries)
|
||||||
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches, maxRetries)
|
|
||||||
if errBlock != nil {
|
|
||||||
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", errBlock), 1)
|
|
||||||
}
|
|
||||||
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
|
||||||
|
|
||||||
if !ctx.Bool("skip-blocks-uploading") {
|
|
||||||
err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
|
||||||
if err != nil {
|
|
||||||
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
|
||||||
}
|
|
||||||
oldestMissingBlockIndex = int(currentBlockHeight) + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, acc, attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
||||||
|
if err != nil {
|
||||||
|
return cli.Exit(fmt.Errorf("failed to upload objects: %w", err), 1)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// retry function with exponential backoff.
|
// retry function with exponential backoff.
|
||||||
func retry(action func() error, maxRetries int) error {
|
func retry(action func() error, maxRetries uint) error {
|
||||||
var err error
|
var err error
|
||||||
backoff := initialBackoff
|
backoff := initialBackoff
|
||||||
for range maxRetries {
|
for range maxRetries {
|
||||||
|
@ -190,98 +180,33 @@ func retry(action func() error, maxRetries int) error {
|
||||||
return err // Return the last error after exhausting retries.
|
return err // Return the last error after exhausting retries.
|
||||||
}
|
}
|
||||||
|
|
||||||
type searchResult struct {
|
// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool.
|
||||||
startIndex int
|
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries uint, debug bool) error {
|
||||||
endIndex int
|
if currentIndexFileID*indexFileSize >= currentBlockHeight {
|
||||||
numOIDs int
|
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight)
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
|
|
||||||
// starting from the currentHeight and going backwards. It returns the index of first block
|
|
||||||
// in the next batch.
|
|
||||||
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches, maxRetries int) (int, error) {
|
|
||||||
var (
|
|
||||||
wg sync.WaitGroup
|
|
||||||
numBatches = currentHeight / searchBatchSize
|
|
||||||
emptyBatchFound bool
|
|
||||||
pWrapper = poolWrapper{p}
|
|
||||||
)
|
|
||||||
|
|
||||||
for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
|
|
||||||
results := make([]searchResult, maxParallelSearches)
|
|
||||||
|
|
||||||
for i := range maxParallelSearches {
|
|
||||||
startIndex := (batch + i) * searchBatchSize
|
|
||||||
endIndex := startIndex + searchBatchSize
|
|
||||||
if endIndex <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if startIndex < 0 {
|
|
||||||
startIndex = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func(i, startIndex, endIndex int) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
prm := client.PrmObjectSearch{}
|
|
||||||
filters := object.NewSearchFilters()
|
|
||||||
if endIndex == startIndex+1 {
|
|
||||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
|
|
||||||
} else {
|
|
||||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
|
|
||||||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
|
|
||||||
}
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
var (
|
|
||||||
objectIDs []oid.ID
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
err = retry(func() error {
|
|
||||||
objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm)
|
|
||||||
return err
|
|
||||||
}, maxRetries)
|
|
||||||
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
|
|
||||||
}(i, startIndex, endIndex)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for i := len(results) - 1; i >= 0; i-- {
|
|
||||||
if results[i].err != nil {
|
|
||||||
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
|
|
||||||
}
|
|
||||||
if results[i].numOIDs == 0 {
|
|
||||||
emptyBatchFound = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
|
|
||||||
return results[i].startIndex / uploadBatchSize * uploadBatchSize, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// uploadBlocks uploads the blocks to the container using the pool.
|
|
||||||
func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
|
|
||||||
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += uploadBatchSize {
|
fmt.Fprintln(ctx.App.Writer, "Uploading blocks and index files...")
|
||||||
|
for indexFileStart := currentIndexFileID * indexFileSize; indexFileStart < currentBlockHeight; indexFileStart += indexFileSize {
|
||||||
var (
|
var (
|
||||||
batchEnd = min(batchStart+uploadBatchSize, int(currentBlockHeight)+1)
|
indexFileEnd = min(indexFileStart+indexFileSize, currentBlockHeight)
|
||||||
errCh = make(chan error)
|
errCh = make(chan error)
|
||||||
doneCh = make(chan struct{})
|
doneCh = make(chan struct{})
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
emptyOID = make([]byte, oidSize)
|
||||||
)
|
)
|
||||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
|
||||||
wg.Add(numWorkers)
|
wg.Add(int(numWorkers))
|
||||||
for i := range numWorkers {
|
for i := range numWorkers {
|
||||||
go func(i int) {
|
go func(i uint) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
|
||||||
|
if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 {
|
||||||
|
if debug {
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
var blk *block.Block
|
var blk *block.Block
|
||||||
errGet := retry(func() error {
|
errGet := retry(func() error {
|
||||||
var errGetBlock error
|
var errGetBlock error
|
||||||
|
@ -316,9 +241,13 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
||||||
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
||||||
}
|
}
|
||||||
|
|
||||||
objBytes := bw.Bytes()
|
var (
|
||||||
|
objBytes = bw.Bytes()
|
||||||
|
resOid oid.ID
|
||||||
|
)
|
||||||
errRetr := retry(func() error {
|
errRetr := retry(func() error {
|
||||||
resOid, errUpload := uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
var errUpload error
|
||||||
|
resOid, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||||
if errUpload != nil {
|
if errUpload != nil {
|
||||||
return errUpload
|
return errUpload
|
||||||
}
|
}
|
||||||
|
@ -334,6 +263,7 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
resOid.Encode(buf[blockIndex%indexFileSize*oidSize:])
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
@ -348,195 +278,151 @@ func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer
|
||||||
return fmt.Errorf("upload error: %w", err)
|
return fmt.Errorf("upload error: %w", err)
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
}
|
}
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)
|
||||||
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
// Additional check for empty OIDs in the buffer.
|
||||||
|
for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize {
|
||||||
|
if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 {
|
||||||
|
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if indexFileEnd-indexFileStart == indexFileSize {
|
||||||
|
attrs := []object.Attribute{
|
||||||
|
*object.NewAttribute(indexAttributeKey, strconv.Itoa(int(indexFileStart/indexFileSize))),
|
||||||
|
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||||
|
}
|
||||||
|
err := retry(func() error {
|
||||||
|
var errUpload error
|
||||||
|
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs, homomorphicHashingDisabled)
|
||||||
|
return errUpload
|
||||||
|
}, maxRetries)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to upload index file: %w", err)
|
||||||
|
}
|
||||||
|
fmt.Println("Successfully uploaded index file ", indexFileStart/indexFileSize)
|
||||||
|
}
|
||||||
|
clear(buf)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadIndexFiles uploads missing index files to the container.
|
// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
|
||||||
func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
|
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) {
|
||||||
var (
|
var (
|
||||||
attributeKey = ctx.String("index-attribute")
|
// buf is used to store OIDs of the uploaded blocks.
|
||||||
indexFileSize = ctx.Uint("index-file-size")
|
buf = make([]byte, indexFileSize*oidSize)
|
||||||
|
doneCh = make(chan struct{})
|
||||||
|
errCh = make(chan error)
|
||||||
|
|
||||||
buffer = make([]byte, indexFileSize*oidSize)
|
|
||||||
doneCh = make(chan struct{})
|
|
||||||
errCh = make(chan error)
|
|
||||||
emptyOid = make([]byte, oidSize)
|
|
||||||
|
|
||||||
expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize
|
|
||||||
existingIndexCount = uint(0)
|
existingIndexCount = uint(0)
|
||||||
filters = object.NewSearchFilters()
|
filters = object.NewSearchFilters()
|
||||||
)
|
)
|
||||||
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(doneCh)
|
defer close(doneCh)
|
||||||
// Search for existing index files.
|
// Search for existing index files.
|
||||||
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
|
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
|
||||||
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters)
|
for i := 0; ; i++ {
|
||||||
for range indexIDs {
|
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, errCh, filters)
|
||||||
existingIndexCount++
|
count := 0
|
||||||
}
|
for range indexIDs {
|
||||||
|
count++
|
||||||
if existingIndexCount >= expectedIndexCount {
|
if count > 1 {
|
||||||
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
|
||||||
|
|
||||||
// Main processing loop for each index file.
|
|
||||||
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
|
||||||
// Start block parsing goroutines.
|
|
||||||
var (
|
|
||||||
// processedIndices is a mapping from position in buffer to the block index.
|
|
||||||
processedIndices sync.Map
|
|
||||||
wg sync.WaitGroup
|
|
||||||
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
|
||||||
)
|
|
||||||
wg.Add(maxParallelSearches)
|
|
||||||
for range maxParallelSearches {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for id := range oidCh {
|
|
||||||
var obj object.Object
|
|
||||||
errRetr := retry(func() error {
|
|
||||||
var errGet error
|
|
||||||
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
|
||||||
return errGet
|
|
||||||
}, maxRetries)
|
|
||||||
if errRetr != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pos := uint(blockIndex) % indexFileSize
|
|
||||||
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
|
||||||
id.Encode(buffer[pos*oidSize:])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Search for blocks within the index file range.
|
|
||||||
startIndex := i * indexFileSize
|
|
||||||
endIndex := startIndex + indexFileSize
|
|
||||||
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, maxRetries, errCh)
|
|
||||||
for id := range objIDs {
|
|
||||||
oidCh <- id
|
|
||||||
}
|
|
||||||
close(oidCh)
|
|
||||||
wg.Wait()
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
|
|
||||||
|
|
||||||
// Check if there are empty OIDs in the generated index file. This may happen
|
|
||||||
// if searchObjects has returned not all blocks within the requested range, ref.
|
|
||||||
// #3645. In this case, retry the search for every missing object.
|
|
||||||
var count int
|
|
||||||
for idx := range indexFileSize {
|
|
||||||
if _, ok := processedIndices.Load(idx); !ok {
|
|
||||||
count++
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Index file %d: fetching missing block %d\n", i, i*indexFileSize+idx)
|
|
||||||
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, maxRetries, errCh)
|
|
||||||
// Block object duplicates are allowed, we're OK with the first found result.
|
|
||||||
id, ok := <-objIDs
|
|
||||||
for range objIDs {
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("index file %d: block %d is missing from the storage", i, i*indexFileSize+idx):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
processedIndices.Store(idx, id)
|
|
||||||
id.Encode(buffer[idx*oidSize:])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)
|
|
||||||
|
|
||||||
// Check if there are empty OIDs in the generated index file. If it happens at
|
|
||||||
// this stage, then there's a bug in the code.
|
|
||||||
for k := 0; k < len(buffer); k += oidSize {
|
|
||||||
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
|
||||||
select {
|
select {
|
||||||
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
|
case errCh <- fmt.Errorf("duplicated index file %d found", i):
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if count == 0 {
|
||||||
// Upload index file.
|
break
|
||||||
attrs := []object.Attribute{
|
|
||||||
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
|
||||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
|
||||||
}
|
}
|
||||||
err := retry(func() error {
|
existingIndexCount++
|
||||||
resOid, errUpload := uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
|
||||||
if errUpload != nil {
|
|
||||||
return errUpload
|
|
||||||
}
|
|
||||||
if debug {
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Uploaded idex file %d with object ID: %s\n", i, resOid.String())
|
|
||||||
}
|
|
||||||
return errUpload
|
|
||||||
}, maxRetries)
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
|
|
||||||
clear(buffer)
|
|
||||||
}
|
}
|
||||||
|
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d\n", existingIndexCount)
|
||||||
|
|
||||||
|
// Start block parsing goroutines.
|
||||||
|
var (
|
||||||
|
// processedIndices is a mapping from position in buffer to the block index.
|
||||||
|
// It prevents duplicates.
|
||||||
|
processedIndices sync.Map
|
||||||
|
wg sync.WaitGroup
|
||||||
|
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
||||||
|
)
|
||||||
|
wg.Add(int(maxParallelSearches))
|
||||||
|
for range maxParallelSearches {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for id := range oidCh {
|
||||||
|
var obj object.Object
|
||||||
|
errRetr := retry(func() error {
|
||||||
|
var errGet error
|
||||||
|
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
||||||
|
return errGet
|
||||||
|
}, maxRetries)
|
||||||
|
if errRetr != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pos := uint(blockIndex) % indexFileSize
|
||||||
|
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
||||||
|
id.Encode(buf[pos*oidSize:])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search for blocks within the index file range.
|
||||||
|
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, errCh)
|
||||||
|
for id := range objIDs {
|
||||||
|
oidCh <- id
|
||||||
|
}
|
||||||
|
close(oidCh)
|
||||||
|
wg.Wait()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
return err
|
return existingIndexCount, nil, err
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
|
return existingIndexCount, buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||||
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
||||||
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
||||||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||||
var res = make(chan oid.ID, 2*searchBatchSize)
|
var res = make(chan oid.ID, 2*searchBatchSize)
|
||||||
go func() {
|
go func() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer close(res)
|
defer close(res)
|
||||||
|
|
||||||
for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
|
for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches {
|
||||||
for j := range maxParallelSearches {
|
for j := range maxParallelSearches {
|
||||||
start := i + j*searchBatchSize
|
start := i + j*searchBatchSize
|
||||||
end := start + searchBatchSize
|
end := start + searchBatchSize
|
||||||
|
|
||||||
if start >= int(endIndex) {
|
if start >= endIndex {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if end > int(endIndex) {
|
if end > endIndex {
|
||||||
end = int(endIndex)
|
end = endIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(start, end int) {
|
go func(start, end uint) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
prm := client.PrmObjectSearch{}
|
prm := client.PrmObjectSearch{}
|
||||||
|
|
|
@ -85,7 +85,7 @@ NAME:
|
||||||
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
|
||||||
|
|
||||||
USAGE:
|
USAGE:
|
||||||
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--skip-blocks-uploading] [--retries <num>] [--debug]
|
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>] [--retries <num>] [--debug]
|
||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
|
||||||
|
@ -96,7 +96,6 @@ OPTIONS:
|
||||||
--index-file-size value Size of index file (default: 128000)
|
--index-file-size value Size of index file (default: 128000)
|
||||||
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
|
||||||
--searchers value Number of concurrent searches for blocks (default: 20)
|
--searchers value Number of concurrent searches for blocks (default: 20)
|
||||||
--skip-blocks-uploading Skip blocks uploading and upload only index files (default: false)
|
|
||||||
--retries value Maximum number of Neo/NeoFS node request retries (default: 5)
|
--retries value Maximum number of Neo/NeoFS node request retries (default: 5)
|
||||||
--debug, -d Enable debug logging (LOTS of output, overrides configuration) (default: false)
|
--debug, -d Enable debug logging (LOTS of output, overrides configuration) (default: false)
|
||||||
--rpc-endpoint value, -r value RPC node address
|
--rpc-endpoint value, -r value RPC node address
|
||||||
|
@ -108,11 +107,18 @@ OPTIONS:
|
||||||
|
|
||||||
This command works as follows:
|
This command works as follows:
|
||||||
1. Fetches the current block height from the RPC node.
|
1. Fetches the current block height from the RPC node.
|
||||||
2. Searches for the oldest half-filled batch of block objects stored in NeoFS.
|
2. Searches for the index files stored in NeoFS.
|
||||||
3. Fetches missing blocks from the RPC node and uploads them to the NeoFS container
|
3. Searches for the stored blocks from the latest incomplete index file.
|
||||||
starting from the oldest half-filled batch.
|
4. Fetches missing blocks from the RPC node and uploads them to the NeoFS container.
|
||||||
4. After uploading the blocks, it creates index files for the newly uploaded blocks.
|
5. After uploading the blocks, it creates index file based on the uploaded block OIDs.
|
||||||
5. Uploads the created index files to the NeoFS container.
|
6. Uploads the created index file to the NeoFS container.
|
||||||
|
7. Repeats steps 4-6 until the current block height is reached.
|
||||||
|
|
||||||
If the command is interrupted, it can be resumed. It starts the uploading process
|
If the command is interrupted, it can be resumed. It starts the uploading process
|
||||||
from the oldest half-filled batch of blocks.
|
from the last uploaded index file.
|
||||||
|
|
||||||
|
For a given block sequence, only one type of index file is supported. If new index
|
||||||
|
files are needed (different `index-file-size` or `index-attribute`), the entire
|
||||||
|
block sequence must be uploaded from the beginning. Please, add a comment to the
|
||||||
|
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
|
||||||
|
functionality.
|
Loading…
Reference in a new issue