mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-11-21 23:29:38 +00:00
cli: add uploading index file during block uploading in upload bin
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
parent
35d12779d6
commit
a0a0358cb1
1 changed files with 173 additions and 114 deletions
|
@ -65,6 +65,8 @@ func uploadBin(ctx *cli.Context) error {
|
|||
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
|
||||
containerIDStr := ctx.String("container")
|
||||
attr := ctx.String("block-attribute")
|
||||
attrIndex := ctx.String("index-attribute")
|
||||
indexFileSize := ctx.Uint("index-file-size")
|
||||
numWorkers := ctx.Int("workers")
|
||||
maxParallelSearches := ctx.Int("searchers")
|
||||
acc, _, err := options.GetAccFromContext(ctx)
|
||||
|
@ -142,17 +144,18 @@ func uploadBin(ctx *cli.Context) error {
|
|||
}
|
||||
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
||||
|
||||
partUploadedBlocks, err := uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, attrIndex, indexFileSize, homomorphicHashingDisabled, maxParallelSearches)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
||||
}
|
||||
|
||||
if !ctx.Bool("skip-blocks-uploading") {
|
||||
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers)
|
||||
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, attrIndex, partUploadedBlocks, oldestMissingBlockIndex, uint(currentBlockHeight), indexFileSize, homomorphicHashingDisabled, numWorkers)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
||||
}
|
||||
}
|
||||
|
||||
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -242,93 +245,124 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
|||
}
|
||||
|
||||
// uploadBlocks uploads the blocks to the container using the pool.
|
||||
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error {
|
||||
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, attrIndex string, buffer []byte, oldestMissingBlockIndex int, currentBlockHeight uint, indexFileSize uint, homomorphicHashingDisabled bool, numWorkers int) error {
|
||||
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
||||
return nil
|
||||
}
|
||||
for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize {
|
||||
var (
|
||||
batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1)
|
||||
errCh = make(chan error)
|
||||
doneCh = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
||||
wg.Add(numWorkers)
|
||||
for i := range numWorkers {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
||||
var blk *block.Block
|
||||
errGet := retry(func() error {
|
||||
var errGetBlock error
|
||||
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
|
||||
if errGetBlock != nil {
|
||||
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if errGet != nil {
|
||||
select {
|
||||
case errCh <- errGet:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
bw := io.NewBufBinWriter()
|
||||
blk.EncodeBinary(bw.BinWriter)
|
||||
if bw.Err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
attrs := []object.Attribute{
|
||||
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
|
||||
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
|
||||
*object.NewAttribute("Hash", blk.Hash().StringLE()),
|
||||
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
|
||||
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
||||
}
|
||||
|
||||
objBytes := bw.Bytes()
|
||||
errRetr := retry(func() error {
|
||||
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||
})
|
||||
if errRetr != nil {
|
||||
select {
|
||||
case errCh <- errRetr:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
for indexFileStart := oldestMissingBlockIndex - oldestMissingBlockIndex%int(indexFileSize); indexFileStart < int(currentBlockHeight); indexFileStart += int(indexFileSize) {
|
||||
indexFileEnd := min(indexFileStart+int(indexFileSize), int(currentBlockHeight)+1)
|
||||
if len(buffer) != 0 {
|
||||
indexFileStart = oldestMissingBlockIndex
|
||||
}
|
||||
fmt.Fprintf(ctx.App.Writer, "Processing index file from %d to %d\n", indexFileStart, indexFileEnd-1)
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
for batchStart := indexFileStart; batchStart < indexFileEnd; batchStart += searchBatchSize {
|
||||
var (
|
||||
batchEnd = min(batchStart+searchBatchSize, int(indexFileEnd)+1)
|
||||
errCh = make(chan error)
|
||||
doneCh = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
|
||||
wg.Add(numWorkers)
|
||||
for i := range numWorkers {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
|
||||
var blk *block.Block
|
||||
errGet := retry(func() error {
|
||||
var errGetBlock error
|
||||
blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
|
||||
if errGetBlock != nil {
|
||||
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if errGet != nil {
|
||||
select {
|
||||
case errCh <- errGet:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return fmt.Errorf("upload error: %w", err)
|
||||
case <-doneCh:
|
||||
bw := io.NewBufBinWriter()
|
||||
blk.EncodeBinary(bw.BinWriter)
|
||||
if bw.Err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err):
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
attrs := []object.Attribute{
|
||||
*object.NewAttribute(attr, strconv.Itoa(int(blk.Index))),
|
||||
*object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))),
|
||||
*object.NewAttribute("Hash", blk.Hash().StringLE()),
|
||||
*object.NewAttribute("PrevHash", blk.PrevHash.StringLE()),
|
||||
*object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)),
|
||||
}
|
||||
|
||||
var (
|
||||
objBytes = bw.Bytes()
|
||||
res oid.ID
|
||||
)
|
||||
errRetr := retry(func() error {
|
||||
var errUpload error
|
||||
res, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
|
||||
return errUpload
|
||||
})
|
||||
if errRetr != nil {
|
||||
select {
|
||||
case errCh <- errRetr:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
res.Encode(buffer[blockIndex%int(indexFileSize)*oidSize:])
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return fmt.Errorf("upload error: %w", err)
|
||||
case <-doneCh:
|
||||
}
|
||||
|
||||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
||||
}
|
||||
|
||||
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
|
||||
err := checkBuffer(buffer, uint(indexFileStart/int(indexFileSize)), indexFileSize, indexFileSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check buffer: %w", err)
|
||||
}
|
||||
attrs := []object.Attribute{
|
||||
*object.NewAttribute(attrIndex, strconv.Itoa(indexFileStart/int(indexFileSize))),
|
||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||
}
|
||||
err = retry(func() error {
|
||||
var errUpload error
|
||||
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
||||
return errUpload
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload index file: %w", err)
|
||||
}
|
||||
fmt.Println("Successfully uploaded index file ", indexFileStart/int(indexFileSize))
|
||||
clear(buffer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadIndexFiles uploads missing index files to the container.
|
||||
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
|
||||
attributeKey := ctx.String("index-attribute")
|
||||
indexFileSize := ctx.Uint("index-file-size")
|
||||
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, lastMissingBlock uint, blockAttributeKey, attributeKey string, indexFileSize uint, homomorphicHashingDisabled bool, maxParallelSearches int) ([]byte, error) {
|
||||
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
|
||||
|
||||
prm := client.PrmObjectSearch{}
|
||||
|
@ -343,28 +377,28 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
return errSearchIndex
|
||||
})
|
||||
if errSearch != nil {
|
||||
return fmt.Errorf("index files search failed: %w", errSearch)
|
||||
return nil, fmt.Errorf("index files search failed: %w", errSearch)
|
||||
}
|
||||
|
||||
var (
|
||||
buffer = make([]byte, indexFileSize*oidSize)
|
||||
doneCh = make(chan []byte, indexFileSize*oidSize)
|
||||
errCh = make(chan error)
|
||||
)
|
||||
existingIndexCount := uint(len(objectIDs))
|
||||
expectedIndexCount := currentHeight / indexFileSize
|
||||
expectedIndexCount := lastMissingBlock / indexFileSize
|
||||
tail := lastMissingBlock % indexFileSize
|
||||
if existingIndexCount >= expectedIndexCount {
|
||||
fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount)
|
||||
return nil
|
||||
if tail == 0 || existingIndexCount > expectedIndexCount+1 {
|
||||
return buffer, nil
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
buffer = make([]byte, indexFileSize*oidSize)
|
||||
doneCh = make(chan struct{})
|
||||
errCh = make(chan error)
|
||||
emptyOid = make([]byte, oidSize)
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
|
||||
// Main processing loop for each index file.
|
||||
for i := existingIndexCount; i < expectedIndexCount; i++ {
|
||||
// Main processing loop for each index file. One more loop is needed for tail search.
|
||||
for i := existingIndexCount; i < expectedIndexCount+1; i++ {
|
||||
// Start block parsing goroutines.
|
||||
var (
|
||||
// processedIndices is a mapping from position in buffer to the block index.
|
||||
|
@ -416,13 +450,16 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
close(oidCh)
|
||||
wg.Wait()
|
||||
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)
|
||||
|
||||
bufferLength := indexFileSize
|
||||
if i == expectedIndexCount {
|
||||
bufferLength = tail
|
||||
}
|
||||
// Check if there are empty OIDs in the generated index file. This may happen
|
||||
// if searchObjects has returned not all blocks within the requested range, ref.
|
||||
// #3645. In this case, retry the search for every missing object.
|
||||
var count int
|
||||
|
||||
for idx := range indexFileSize {
|
||||
for idx := range bufferLength {
|
||||
if _, ok := processedIndices.Load(idx); !ok {
|
||||
count++
|
||||
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh)
|
||||
|
@ -445,14 +482,20 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
|
||||
// Check if there are empty OIDs in the generated index file. If it happens at
|
||||
// this stage, then there's a bug in the code.
|
||||
for k := 0; k < len(buffer); k += oidSize {
|
||||
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
|
||||
default:
|
||||
}
|
||||
return
|
||||
err := checkBuffer(buffer, i, bufferLength, indexFileSize)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- err:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
if i == expectedIndexCount {
|
||||
select {
|
||||
case doneCh <- buffer:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Upload index file.
|
||||
|
@ -460,8 +503,10 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
|
||||
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
|
||||
}
|
||||
err := retry(func() error {
|
||||
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
||||
err = retry(func() error {
|
||||
var errUpload error
|
||||
_, errUpload = uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
|
||||
return errUpload
|
||||
})
|
||||
if err != nil {
|
||||
select {
|
||||
|
@ -477,11 +522,10 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
case <-doneCh:
|
||||
return nil, err
|
||||
case res := <-doneCh:
|
||||
return res, nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||
|
@ -542,7 +586,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
}
|
||||
|
||||
// uploadObj uploads object to the container using provided settings.
|
||||
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {
|
||||
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
|
||||
var (
|
||||
ownerID user.ID
|
||||
hdr object.Object
|
||||
|
@ -550,6 +594,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
|
|||
chHomomorphic checksum.Checksum
|
||||
v = new(version.Version)
|
||||
prmObjectPutInit client.PrmObjectPutInit
|
||||
res oid.ID
|
||||
)
|
||||
|
||||
ownerID.SetScriptHash(owner)
|
||||
|
@ -570,23 +615,27 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
|
|||
|
||||
err := hdr.SetIDWithSignature(signer)
|
||||
if err != nil {
|
||||
return err
|
||||
return res, err
|
||||
}
|
||||
err = hdr.CheckHeaderVerificationFields()
|
||||
if err != nil {
|
||||
return err
|
||||
return res, err
|
||||
}
|
||||
err = hdr.CalculateAndSetID()
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initiate object upload: %w", err)
|
||||
return res, fmt.Errorf("failed to initiate object upload: %w", err)
|
||||
}
|
||||
defer writer.Close()
|
||||
_, err = writer.Write(objData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write object data: %w", err)
|
||||
return res, fmt.Errorf("failed to write object data: %w", err)
|
||||
}
|
||||
return nil
|
||||
res, _ = hdr.ID()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func getBlockIndex(header *object.Object, attribute string) (int, error) {
|
||||
|
@ -602,3 +651,13 @@ func getBlockIndex(header *object.Object, attribute string) (int, error) {
|
|||
}
|
||||
return -1, fmt.Errorf("attribute %s not found", attribute)
|
||||
}
|
||||
|
||||
func checkBuffer(buffer []byte, i uint, bufferLength uint, indexFileSize uint) error {
|
||||
emptyOid := make([]byte, oidSize)
|
||||
for k := 0; uint(k) < bufferLength*oidSize; k += oidSize {
|
||||
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
|
||||
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue