mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2025-01-20 15:08:19 +00:00
04516e7d26
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
651 lines
21 KiB
Go
651 lines
21 KiB
Go
package util
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"slices"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/cli/cmdargs"
|
|
"github.com/nspcc-dev/neo-go/cli/options"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
|
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
|
"github.com/nspcc-dev/neofs-sdk-go/checksum"
|
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
|
"github.com/nspcc-dev/neofs-sdk-go/container"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
|
"github.com/nspcc-dev/neofs-sdk-go/version"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
const (
|
|
// Number of objects to upload in a batch. All batches of uploadBatchSize size
|
|
// except the most recent one are guaranteed to be completed and don't contain gaps.
|
|
uploadBatchSize = 10000
|
|
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
|
|
// it may lead to many duplicate uploads. We need to search with EQ filter to
|
|
// avoid partially-completed SEARCH responses.
|
|
searchBatchSize = 1
|
|
// Size of object ID.
|
|
oidSize = sha256.Size
|
|
)
|
|
|
|
// Constants related to retry mechanism.
|
|
const (
|
|
// Initial backoff duration.
|
|
initialBackoff = 500 * time.Millisecond
|
|
// Backoff multiplier.
|
|
backoffFactor = 2
|
|
// Maximum backoff duration.
|
|
maxBackoff = 20 * time.Second
|
|
)
|
|
|
|
// Constants related to NeoFS pool request timeouts.
|
|
// Such big values are used to avoid NeoFS pool timeouts during block search and upload.
|
|
const (
|
|
defaultDialTimeout = 10 * time.Minute
|
|
defaultStreamTimeout = 10 * time.Minute
|
|
defaultHealthcheckTimeout = 10 * time.Second
|
|
)
|
|
|
|
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
|
type poolWrapper struct {
|
|
*pool.Pool
|
|
}
|
|
|
|
// Close closes the pool and returns nil.
|
|
func (p poolWrapper) Close() error {
|
|
p.Pool.Close()
|
|
return nil
|
|
}
|
|
|
|
func uploadBin(ctx *cli.Context) error {
|
|
if err := cmdargs.EnsureNone(ctx); err != nil {
|
|
return err
|
|
}
|
|
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
|
containerIDStr := ctx.String("container")
|
|
attr := ctx.String("block-attribute")
|
|
numWorkers := ctx.Int("workers")
|
|
maxParallelSearches := ctx.Int("searchers")
|
|
maxRetries := int(ctx.Uint("retries"))
|
|
debug := ctx.Bool("debug")
|
|
acc, _, err := options.GetAccFromContext(ctx)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
|
|
}
|
|
|
|
var containerID cid.ID
|
|
if err = containerID.DecodeString(containerIDStr); err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1)
|
|
}
|
|
|
|
gctx, cancel := options.GetTimeoutContext(ctx)
|
|
defer cancel()
|
|
rpc, err := options.GetRPCClient(gctx, ctx)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to create RPC client: %v", err), 1)
|
|
}
|
|
|
|
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)
|
|
|
|
params := pool.DefaultOptions()
|
|
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
|
|
params.SetNodeDialTimeout(defaultDialTimeout)
|
|
params.SetNodeStreamTimeout(defaultStreamTimeout)
|
|
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
|
|
}
|
|
pWrapper := poolWrapper{p}
|
|
if err = pWrapper.Dial(context.Background()); err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
|
|
}
|
|
defer p.Close()
|
|
|
|
var net netmap.NetworkInfo
|
|
err = retry(func() error {
|
|
var errNet error
|
|
net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
|
|
return errNet
|
|
}, maxRetries)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
|
|
}
|
|
homomorphicHashingDisabled := net.HomomorphicHashingDisabled()
|
|
|
|
var containerObj container.Container
|
|
err = retry(func() error {
|
|
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
|
|
return err
|
|
}, maxRetries)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
|
|
}
|
|
containerMagic := containerObj.Attribute("Magic")
|
|
|
|
v, err := rpc.GetVersion()
|
|
if err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to get version from RPC: %v", err), 1)
|
|
}
|
|
magic := strconv.Itoa(int(v.Protocol.Network))
|
|
if containerMagic != magic {
|
|
return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1)
|
|
}
|
|
|
|
currentBlockHeight, err := rpc.GetBlockCount()
|
|
if err != nil {
|
|
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
|
|
}
|
|
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
|
|
|
|
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches, maxRetries)
|
|
if errBlock != nil {
|
|
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", errBlock), 1)
|
|
}
|
|
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
|
|
|
if !ctx.Bool("skip-blocks-uploading") {
|
|
err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
|
}
|
|
oldestMissingBlockIndex = int(currentBlockHeight) + 1
|
|
}
|
|
|
|
err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
|
|
if err != nil {
|
|
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// retry function with exponential backoff.
|
|
func retry(action func() error, maxRetries int) error {
|
|
var err error
|
|
backoff := initialBackoff
|
|
for range maxRetries {
|
|
if err = action(); err == nil {
|
|
return nil // Success, no retry needed.
|
|
}
|
|
time.Sleep(backoff) // Backoff before retrying.
|
|
backoff *= time.Duration(backoffFactor)
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
return err // Return the last error after exhausting retries.
|
|
}
|
|
|
|
type searchResult struct {
|
|
startIndex int
|
|
endIndex int
|
|
numOIDs int
|
|
err error
|
|
}
|
|
|
|
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
|
|
// starting from the currentHeight and going backwards. It returns the index of first block
|
|
// in the next batch.
|
|
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches, maxRetries int) (int, error) {
|
|
var (
|
|
wg sync.WaitGroup
|
|
numBatches = currentHeight / searchBatchSize
|
|
emptyBatchFound bool
|
|
pWrapper = poolWrapper{p}
|
|
)
|
|
|
|
for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
|
|
results := make([]searchResult, maxParallelSearches)
|
|
|
|
for i := range maxParallelSearches {
|
|
startIndex := (batch + i) * searchBatchSize
|
|
endIndex := startIndex + searchBatchSize
|
|
if endIndex <= 0 {
|
|
continue
|
|
}
|
|
if startIndex < 0 {
|
|
startIndex = 0
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(i, startIndex, endIndex int) {
|
|
defer wg.Done()
|
|
|
|
prm := client.PrmObjectSearch{}
|
|
filters := object.NewSearchFilters()
|
|
if endIndex == startIndex+1 {
|
|
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
|
|
} else {
|
|
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
|
|
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
|
|
}
|
|
prm.SetFilters(filters)
|
|
var (
|
|
objectIDs []oid.ID
|
|
err error
|
|
)
|
|
err = retry(func() error {
|
|
objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm)
|
|
return err
|
|
}, maxRetries)
|
|
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
|
|
}(i, startIndex, endIndex)
|
|
}
|
|
wg.Wait()
|
|
|
|
for i := len(results) - 1; i >= 0; i-- {
|
|
if results[i].err != nil {
|
|
return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err)
|
|
}
|
|
if results[i].numOIDs == 0 {
|
|
emptyBatchFound = true
|
|
continue
|
|
}
|
|
if emptyBatchFound || (batch == numBatches && i == len(results)-1) {
|
|
return results[i].startIndex / uploadBatchSize * uploadBatchSize, nil
|
|
}
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
// uploadBlocks uploads the blocks to the container using the pool.
|
|
func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
|
|
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
|
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
|
return nil
|
|
}
|
|
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += uploadBatchSize {
|
|
var (
|
|
batchEnd = min(batchStart+uploadBatchSize, int(currentBlockHeight)+1)
|
|
errCh = make(chan error)
|
|
doneCh = make(chan struct{})
|
|
wg sync.WaitGroup
|
|
)
|
|
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
|
wg.Add(numWorkers)
|
|
for i := range numWorkers {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
|
var blk *block.Block
|
|
errGet := retry(func() error {
|
|
var errGetBlock error
|
|
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
|
|
if errGetBlock != nil {
|
|
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
|
|
}
|
|
return nil
|
|
}, maxRetries)
|
|
if errGet != nil {
|
|
select {
|
|
case errCh <- errGet:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
|
|
bw := io.NewBufBinWriter()
|
|
blk.EncodeBinary(bw.BinWriter)
|
|
if bw.Err != nil {
|
|
select {
|
|
case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
attrs := []object.Attribute{
|
|
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
|
|
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
|
|
*object.NewAttribute("Hash", blk.Hash().StringLE()),
|
|
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
|
|
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
|
}
|
|
|
|
objBytes := bw.Bytes()
|
|
errRetr := retry(func() error {
|
|
resOid, errUpload := uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
|
if errUpload != nil {
|
|
return errUpload
|
|
}
|
|
if debug {
|
|
fmt.Fprintf(ctx.App.Writer, "Uploaded block %d with object ID: %s\n", blockIndex, resOid.String())
|
|
}
|
|
return errUpload
|
|
}, maxRetries)
|
|
if errRetr != nil {
|
|
select {
|
|
case errCh <- errRetr:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(doneCh)
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
return fmt.Errorf("upload error: %w", err)
|
|
case <-doneCh:
|
|
}
|
|
|
|
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// uploadIndexFiles uploads missing index files to the container.
|
|
func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
|
|
var (
|
|
attributeKey = ctx.String("index-attribute")
|
|
indexFileSize = ctx.Uint("index-file-size")
|
|
|
|
buffer = make([]byte, indexFileSize*oidSize)
|
|
doneCh = make(chan struct{})
|
|
errCh = make(chan error)
|
|
emptyOid = make([]byte, oidSize)
|
|
|
|
expectedIndexCount = (oldestMissingBlockIndex - 1) / indexFileSize
|
|
existingIndexCount = uint(0)
|
|
filters = object.NewSearchFilters()
|
|
)
|
|
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
|
|
|
|
go func() {
|
|
defer close(doneCh)
|
|
// Search for existing index files.
|
|
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
|
|
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, 0, expectedIndexCount, maxParallelSearches, maxRetries, errCh, filters)
|
|
for range indexIDs {
|
|
existingIndexCount++
|
|
}
|
|
|
|
if existingIndexCount >= expectedIndexCount {
|
|
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
|
return
|
|
}
|
|
fmt.Fprintf(ctx.App.Writer, "Current index files count: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
|
|
|
// Main processing loop for each index file.
|
|
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
|
// Start block parsing goroutines.
|
|
var (
|
|
// processedIndices is a mapping from position in buffer to the block index.
|
|
processedIndices sync.Map
|
|
wg sync.WaitGroup
|
|
oidCh = make(chan oid.ID, 2*maxParallelSearches)
|
|
)
|
|
wg.Add(maxParallelSearches)
|
|
for range maxParallelSearches {
|
|
go func() {
|
|
defer wg.Done()
|
|
for id := range oidCh {
|
|
var obj object.Object
|
|
errRetr := retry(func() error {
|
|
var errGet error
|
|
obj, _, errGet = p.ObjectGetInit(ctx.Context, containerID, id, signer, client.PrmObjectGet{})
|
|
return errGet
|
|
}, maxRetries)
|
|
if errRetr != nil {
|
|
select {
|
|
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
|
|
if err != nil {
|
|
select {
|
|
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
pos := uint(blockIndex) % indexFileSize
|
|
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
|
|
id.Encode(buffer[pos*oidSize:])
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Search for blocks within the index file range.
|
|
startIndex := i * indexFileSize
|
|
endIndex := startIndex + indexFileSize
|
|
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, maxRetries, errCh)
|
|
for id := range objIDs {
|
|
oidCh <- id
|
|
}
|
|
close(oidCh)
|
|
wg.Wait()
|
|
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
|
|
|
|
// Check if there are empty OIDs in the generated index file. This may happen
|
|
// if searchObjects has returned not all blocks within the requested range, ref.
|
|
// #3645. In this case, retry the search for every missing object.
|
|
var count int
|
|
for idx := range indexFileSize {
|
|
if _, ok := processedIndices.Load(idx); !ok {
|
|
count++
|
|
fmt.Fprintf(ctx.App.Writer, "Index file %d: fetching missing block %d\n", i, i*indexFileSize+idx)
|
|
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, maxRetries, errCh)
|
|
// Block object duplicates are allowed, we're OK with the first found result.
|
|
id, ok := <-objIDs
|
|
for range objIDs {
|
|
}
|
|
if !ok {
|
|
select {
|
|
case errCh <- fmt.Errorf("index file %d: block %d is missing from the storage", i, i*indexFileSize+idx):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
processedIndices.Store(idx, id)
|
|
id.Encode(buffer[idx*oidSize:])
|
|
}
|
|
}
|
|
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)
|
|
|
|
// Check if there are empty OIDs in the generated index file. If it happens at
|
|
// this stage, then there's a bug in the code.
|
|
for k := 0; k < len(buffer); k += oidSize {
|
|
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
|
select {
|
|
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// Upload index file.
|
|
attrs := []object.Attribute{
|
|
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
|
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
|
}
|
|
err := retry(func() error {
|
|
resOid, errUpload := uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
|
if errUpload != nil {
|
|
return errUpload
|
|
}
|
|
if debug {
|
|
fmt.Fprintf(ctx.App.Writer, "Uploaded idex file %d with object ID: %s\n", i, resOid.String())
|
|
}
|
|
return errUpload
|
|
}, maxRetries)
|
|
if err != nil {
|
|
select {
|
|
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
|
|
clear(buffer)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
case <-doneCh:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
|
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
|
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
|
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
|
var res = make(chan oid.ID, 2*searchBatchSize)
|
|
go func() {
|
|
var wg sync.WaitGroup
|
|
defer close(res)
|
|
|
|
for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
|
|
for j := range maxParallelSearches {
|
|
start := i + j*searchBatchSize
|
|
end := start + searchBatchSize
|
|
|
|
if start >= int(endIndex) {
|
|
break
|
|
}
|
|
if end > int(endIndex) {
|
|
end = int(endIndex)
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(start, end int) {
|
|
defer wg.Done()
|
|
|
|
prm := client.PrmObjectSearch{}
|
|
filters := object.NewSearchFilters()
|
|
if len(additionalFilters) != 0 {
|
|
filters = additionalFilters[0]
|
|
}
|
|
if end == start+1 {
|
|
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
|
|
} else {
|
|
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
|
|
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
|
|
}
|
|
prm.SetFilters(filters)
|
|
|
|
var objIDs []oid.ID
|
|
err := retry(func() error {
|
|
var errBlockSearch error
|
|
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
|
|
return errBlockSearch
|
|
}, maxRetries)
|
|
if err != nil {
|
|
select {
|
|
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
|
|
for _, id := range objIDs {
|
|
res <- id
|
|
}
|
|
}(start, end)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
}()
|
|
|
|
return res
|
|
}
|
|
|
|
// uploadObj uploads object to the container using provided settings.
|
|
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
|
|
var (
|
|
ownerID user.ID
|
|
hdr object.Object
|
|
chSHA256 checksum.Checksum
|
|
chHomomorphic checksum.Checksum
|
|
v = new(version.Version)
|
|
prmObjectPutInit client.PrmObjectPutInit
|
|
resOID = oid.ID{}
|
|
)
|
|
|
|
ownerID.SetScriptHash(owner)
|
|
hdr.SetPayload(objData)
|
|
hdr.SetPayloadSize(uint64(len(objData)))
|
|
hdr.SetContainerID(containerID)
|
|
hdr.SetOwnerID(&ownerID)
|
|
hdr.SetAttributes(attrs...)
|
|
hdr.SetCreationEpoch(1)
|
|
v.SetMajor(1)
|
|
hdr.SetVersion(v)
|
|
if !homomorphicHashingDisabled {
|
|
checksum.Calculate(&chHomomorphic, checksum.TZ, objData)
|
|
hdr.SetPayloadHomomorphicHash(chHomomorphic)
|
|
}
|
|
checksum.Calculate(&chSHA256, checksum.SHA256, objData)
|
|
hdr.SetPayloadChecksum(chSHA256)
|
|
|
|
err := hdr.SetIDWithSignature(signer)
|
|
if err != nil {
|
|
return resOID, err
|
|
}
|
|
err = hdr.CheckHeaderVerificationFields()
|
|
if err != nil {
|
|
return resOID, err
|
|
}
|
|
|
|
writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
|
|
if err != nil {
|
|
return resOID, fmt.Errorf("failed to initiate object upload: %w", err)
|
|
}
|
|
_, err = writer.Write(objData)
|
|
if err != nil {
|
|
_ = writer.Close()
|
|
return resOID, fmt.Errorf("failed to write object data: %w", err)
|
|
}
|
|
err = writer.Close()
|
|
if err != nil {
|
|
return resOID, fmt.Errorf("failed to close object writer: %w", err)
|
|
}
|
|
res := writer.GetResult()
|
|
resOID = res.StoredObjectID()
|
|
if resOID.Equals(oid.ID{}) {
|
|
return resOID, fmt.Errorf("object ID is empty")
|
|
}
|
|
return resOID, nil
|
|
}
|
|
|
|
func getBlockIndex(header object.Object, attribute string) (int, error) {
|
|
for _, attr := range header.UserAttributes() {
|
|
if attr.Key() == attribute {
|
|
value := attr.Value()
|
|
blockIndex, err := strconv.Atoi(value)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("attribute %s has invalid value: %s, error: %w", attribute, value, err)
|
|
}
|
|
return blockIndex, nil
|
|
}
|
|
}
|
|
return -1, fmt.Errorf("attribute %s not found", attribute)
|
|
}
|