cli: add upload-bin

This command is used for keeping container with blocks for
blockfetcher updated.

Close #3578

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-10-15 15:27:28 +03:00
parent 2188b27e5c
commit be768691c5
3 changed files with 509 additions and 0 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt"
"os"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/cli/txctx"
@ -32,6 +33,51 @@ func NewCommands() []*cli.Command {
txctx.AwaitFlag,
}, options.RPC...)
txCancelFlags = append(txCancelFlags, options.Wallet...)
uploadBinFlags := append([]cli.Flag{
&cli.StringSliceFlag{
Name: "fs-rpc-endpoint",
Aliases: []string{"fsr"},
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
Required: true,
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
for _, endpoint := range fsRpcEndpoints {
if endpoint == "" {
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
}
}
return nil
},
},
&cli.StringFlag{
Name: "container",
Aliases: []string{"cid"},
Usage: "NeoFS container ID to upload blocks to",
Required: true,
Action: cmdargs.EnsureNotEmpty("container"),
},
&cli.StringFlag{
Name: "block-attribute",
Usage: "Attribute key of the block object",
Required: true,
Action: cmdargs.EnsureNotEmpty("block-attribute"),
},
&cli.StringFlag{
Name: "index-attribute",
Usage: "Attribute key of the index file object",
Required: true,
Action: cmdargs.EnsureNotEmpty("index-attribute"),
},
&flags.AddressFlag{
Name: "address",
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
},
&cli.UintFlag{
Name: "index-file-size",
Usage: "Size of index file",
Value: 128000,
},
}, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{
{
Name: "util",
@ -109,6 +155,13 @@ func NewCommands() []*cli.Command {
},
},
},
{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]",
Action: uploadBin,
Flags: uploadBinFlags,
},
},
},
}

434
cli/util/uploader.go Normal file
View file

@ -0,0 +1,434 @@
package util
import (
"context"
"crypto/sha256"
"fmt"
"strconv"
"sync"
"time"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/urfave/cli/v2"
)
const (
searchBatchSize = 10000 // Number of objects to search in a batch for finding max block in container.
maxParallelSearches = 40 // Control the number of concurrent searches.
oidSize = sha256.Size // Size of object ID.
numWorkers = 100 // Number of workers to fetch and upload blocks concurrently.
)
func uploadBin(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
}
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
containerIDStr := ctx.String("container")
attr := ctx.String("block-attribute")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
}
var containerID cid.ID
if err = containerID.DecodeString(containerIDStr); err != nil {
return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1)
}
gctx, cancel := options.GetTimeoutContext(ctx)
defer cancel()
rpc, err := options.GetRPCClient(gctx, ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1)
}
currentBlockHeight, err := rpc.GetBlockCount()
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
}
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, pool.DefaultOptions())
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
}
if err = p.Dial(context.Background()); err != nil {
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
}
defer p.Close()
net, err := p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
if err != nil {
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
}
homomorphicHashingDisabled := net.HomomorphicHashingDisabled()
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight))
if err != nil {
return cli.Exit(fmt.Errorf("failed to fetch max block index from container: %w", err), 1)
}
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", lastMissingBlockIndex)
if lastMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", lastMissingBlockIndex, currentBlockHeight)
return nil
}
for batchStart := lastMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
batchEnd := min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd)
errorChan := make(chan error)
doneChan := make(chan struct{})
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := range numWorkers {
go func(i int) {
defer wg.Done()
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
var blk *block.Block
err = retry(func() error {
blk, err = rpc.GetBlockByIndex(uint32(blockIndex))
if err != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, err)
}
return nil
})
if err != nil {
select {
case errorChan <- err:
default:
}
return
}
bw := io.NewBufBinWriter()
blk.EncodeBinary(bw.BinWriter)
if bw.Err != nil {
errorChan <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err)
return
}
attrs := []object.Attribute{
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
*object.NewAttribute("primary", strconv.Itoa(int(blk.PrimaryIndex))),
*object.NewAttribute("hash", blk.Hash().StringLE()),
*object.NewAttribute("prevHash", blk.PrevHash.StringLE()),
*object.NewAttribute("timestamp", strconv.FormatUint(blk.Timestamp, 10)),
}
err = retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, bw.Bytes(), attrs, homomorphicHashingDisabled)
})
if err != nil {
select {
case errorChan <- err:
default:
}
return
}
}
}(i)
}
go func() {
wg.Wait()
close(doneChan)
}()
select {
case err := <-errorChan:
return cli.Exit(fmt.Errorf("upload error: %w", err), 1)
case <-doneChan:
}
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd)
}
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled)
if err != nil {
return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1)
}
return nil
}
const (
maxRetries = 3 // Maximum number of retries
initialBackoff = 500 * time.Millisecond // Initial backoff duration
backoffFactor = 2 // Backoff multiplier
maxBackoff = 10 * time.Second // Maximum backoff duration
)
// Retry function with exponential backoff.
func retry(action func() error) error {
var err error
backoff := initialBackoff
for range maxRetries {
if err = action(); err == nil {
return nil // Success, no retry needed
}
time.Sleep(backoff) // Backoff before retrying
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
}
}
return err // Return the last error after exhausting retries
}
type searchResult struct {
startIndex int
numOIDs int
err error
}
// fetchLatestMissingBlockIndex searches the container for the last full block batch,
// starting from the currentHeight and going backwards.
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int) (int, error) {
var (
wg sync.WaitGroup
maxFullBatchStartIndex = -1
numBatches = currentHeight/searchBatchSize + 1
)
for batch := numBatches; ; batch -= maxParallelSearches {
if batch < 0 {
batch = 0
}
results := make([]searchResult, maxParallelSearches)
for i := range maxParallelSearches {
currentBatch := batch - i
startIndex := currentBatch * searchBatchSize
endIndex := startIndex + searchBatchSize
if currentBatch <= 0 {
startIndex = 0
endIndex = searchBatchSize
}
wg.Add(1)
go func(i, startIndex, endIndex int) {
defer wg.Done()
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
prm.SetFilters(filters)
objectIDs, err := neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm)
results[i] = searchResult{startIndex: startIndex, numOIDs: len(objectIDs), err: err}
}(i, startIndex, endIndex)
}
wg.Wait()
for _, res := range results {
if res.err != nil {
return 0, res.err
}
if res.numOIDs > 0 {
if res.numOIDs == searchBatchSize {
maxFullBatchStartIndex = max(maxFullBatchStartIndex, res.startIndex)
} else {
return res.startIndex, nil
}
}
}
if maxFullBatchStartIndex != -1 {
return maxFullBatchStartIndex + searchBatchSize, nil
}
if batch == 0 {
break
}
}
return 0, nil
}
// updateIndexFiles updates the index files in the container.
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Updating index files...")
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE)
filters.AddFilter("size", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters)
objectIDs, err := neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
if err != nil {
return fmt.Errorf("search of index files failed: %w", err)
}
existingIndexCount := uint(len(objectIDs))
expectedIndexCount := currentHeight / indexFileSize
if existingIndexCount >= expectedIndexCount {
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
return nil
}
var (
errCh = make(chan error)
buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)
)
defer close(oidCh)
for range maxParallelSearches {
go func() {
for id := range oidCh {
err := retry(func() error {
obj, err := p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
if err != nil {
return fmt.Errorf("failed to fetch object %s: %w", id.String(), err)
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
return fmt.Errorf("failed to get block index from object %s: %w", id.String(), err)
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
oidFetcherToProcessor <- struct{}{}
return nil
})
if err != nil {
select {
case errCh <- err:
default:
}
return
}
}
}()
}
for i := existingIndexCount; i <= expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
go func() {
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
objectIDs, err := neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
if err != nil {
errCh <- fmt.Errorf("no OIDs found for index files: %w", err)
return
}
for _, id := range objectIDs {
oidCh <- id
}
}
}()
var completed int
waitLoop:
for {
select {
case err := <-errCh:
return err
case <-oidFetcherToProcessor:
completed++
if completed == int(indexFileSize) {
break waitLoop
}
}
}
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("size", strconv.Itoa(int(indexFileSize))),
}
err = uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err)
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
}
return nil
}
// uploadObj uploads the block to the container using the pool.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error {
var (
ownerID user.ID
hdr object.Object
chSHA256 checksum.Checksum
chHomomorphic checksum.Checksum
v = new(version.Version)
prmObjectPutInit client.PrmObjectPutInit
)
ownerID.SetScriptHash(owner)
hdr.SetPayload(objData)
hdr.SetPayloadSize(uint64(len(objData)))
hdr.SetContainerID(containerID)
hdr.SetOwnerID(&ownerID)
hdr.SetAttributes(attrs...)
hdr.SetCreationEpoch(1)
v.SetMajor(1)
hdr.SetVersion(v)
if !HomomorphicHashingDisabled {
checksum.Calculate(&chHomomorphic, checksum.TZ, objData)
hdr.SetPayloadHomomorphicHash(chHomomorphic)
}
checksum.Calculate(&chSHA256, checksum.SHA256, objData)
hdr.SetPayloadChecksum(chSHA256)
err := hdr.SetIDWithSignature(signer)
if err != nil {
return err
}
err = hdr.CheckHeaderVerificationFields()
if err != nil {
return err
}
writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
if err != nil {
return fmt.Errorf("failed to initiate object upload: %w", err)
}
defer writer.Close()
_, err = writer.Write(objData)
if err != nil {
return fmt.Errorf("failed to write object data: %w", err)
}
return nil
}
func getBlockIndex(header *object.Object, attribute string) (int, error) {
for _, attr := range header.UserAttributes() {
if attr.Key() == attribute {
return strconv.Atoi(attr.Value())
}
}
return -1, fmt.Errorf("attribute %s not found", attribute)
}

View file

@ -190,3 +190,25 @@ func TestAwaitUtilCancelTx(t *testing.T) {
t.Fatal(fmt.Errorf("unexpected error: %w", err))
}
}
func TestUploadBin(t *testing.T) {
e := testcli.NewExecutor(t, true)
args := []string{
"neo-go", "util", "upload-bin",
"--cid", "test",
"--wallet", "./not-exist.json",
"--block-attribute", "block",
"--index-attribute", "oid-index",
"--fsr", "st1.local.fs.neo.org:8080",
}
e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to load account", append(args, "--cid", "test", "--wallet", "./not-exist.json", "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to decode container ID", append(args, "--cid", "test", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r")
e.RunWithErrorCheckExit(t, "failed to create RPC client", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "https://test")...)
e.In.WriteString("one\r")
e.RunWithErrorCheck(t, "failed to dial NeoFS pool", append(args, "--cid", "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", "--wallet", testcli.ValidatorWallet, "--rpc-endpoint", "http://"+e.RPC.Addresses()[0])...)
e.CheckNextLine(t, "Chain block height:")
e.CheckEOF(t)
}