From a0a0358cb11510911696f1d2ec505f8b6cb7f3c3 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Sat, 2 Nov 2024 03:35:50 +0300 Subject: [PATCH] cli: add uploading index file during block uploading in `upload bin` Signed-off-by: Ekaterina Pavlova --- cli/util/uploader.go | 287 ++++++++++++++++++++++++++----------------- 1 file changed, 173 insertions(+), 114 deletions(-) diff --git a/cli/util/uploader.go b/cli/util/uploader.go index 8ef315ab2..510fa6428 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -65,6 +65,8 @@ func uploadBin(ctx *cli.Context) error { rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint") containerIDStr := ctx.String("container") attr := ctx.String("block-attribute") + attrIndex := ctx.String("index-attribute") + indexFileSize := ctx.Uint("index-file-size") numWorkers := ctx.Int("workers") maxParallelSearches := ctx.Int("searchers") acc, _, err := options.GetAccFromContext(ctx) @@ -142,17 +144,18 @@ func uploadBin(ctx *cli.Context) error { } fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex) + partUploadedBlocks, err := uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, attrIndex, indexFileSize, homomorphicHashingDisabled, maxParallelSearches) + if err != nil { + return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1) + } + if !ctx.Bool("skip-blocks-uploading") { - err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers) + err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, attrIndex, partUploadedBlocks, oldestMissingBlockIndex, uint(currentBlockHeight), indexFileSize, homomorphicHashingDisabled, numWorkers) if err != nil { return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1) } } - err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches) - if err != nil { - return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1) - } return nil } @@ -242,93 +245,124 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID } // uploadBlocks uploads the blocks to the container using the pool. -func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error { +func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, attrIndex string, buffer []byte, oldestMissingBlockIndex int, currentBlockHeight uint, indexFileSize uint, homomorphicHashingDisabled bool, numWorkers int) error { if oldestMissingBlockIndex > int(currentBlockHeight) { fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight) return nil } - for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize { - var ( - batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1) - errCh = make(chan error) - doneCh = make(chan struct{}) - wg sync.WaitGroup - ) - fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1) - wg.Add(numWorkers) - for i := range numWorkers { - go func(i int) { - defer wg.Done() - for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers { - var blk *block.Block - errGet := retry(func() error { - var errGetBlock error - blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex)) - if errGetBlock != nil { - return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock) - } - return nil - }) - if errGet != nil { - select { - case errCh <- errGet: - default: - } - return - } - - bw := io.NewBufBinWriter() - blk.EncodeBinary(bw.BinWriter) - if bw.Err != nil { - select { - case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err): - default: - } - return - } - attrs := []object.Attribute{ - *object.NewAttribute(attr, strconv.Itoa(int(blk.Index))), - *object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))), - *object.NewAttribute("Hash", blk.Hash().StringLE()), - *object.NewAttribute("PrevHash", blk.PrevHash.StringLE()), - *object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)), - } - - objBytes := bw.Bytes() - errRetr := retry(func() error { - return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled) - }) - if errRetr != nil { - select { - case errCh <- errRetr: - default: - } - return - } - } - }(i) + for indexFileStart := oldestMissingBlockIndex - oldestMissingBlockIndex%int(indexFileSize); indexFileStart < int(currentBlockHeight); indexFileStart += int(indexFileSize) { + indexFileEnd := min(indexFileStart+int(indexFileSize), int(currentBlockHeight)+1) + if len(buffer) != 0 { + indexFileStart = oldestMissingBlockIndex } + fmt.Fprintf(ctx.App.Writer, "Processing index file from %d to %d\n", indexFileStart, indexFileEnd-1) - go func() { - wg.Wait() - close(doneCh) - }() + for batchStart := indexFileStart; batchStart < indexFileEnd; batchStart += searchBatchSize { + var ( + batchEnd = min(batchStart+searchBatchSize, int(indexFileEnd)+1) + errCh = make(chan error) + doneCh = make(chan struct{}) + wg sync.WaitGroup + ) + fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1) + wg.Add(numWorkers) + for i := range numWorkers { + go func(i int) { + defer wg.Done() + for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers { + var blk *block.Block + errGet := retry(func() error { + var errGetBlock error + blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex)) + if errGetBlock != nil { + return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock) + } + return nil + }) + if errGet != nil { + select { + case errCh <- errGet: + default: + } + return + } - select { - case err := <-errCh: - return fmt.Errorf("upload error: %w", err) - case <-doneCh: + bw := io.NewBufBinWriter() + blk.EncodeBinary(bw.BinWriter) + if bw.Err != nil { + select { + case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err): + default: + } + return + } + attrs := []object.Attribute{ + *object.NewAttribute(attr, strconv.Itoa(int(blk.Index))), + *object.NewAttribute("Primary", strconv.Itoa(int(blk.PrimaryIndex))), + *object.NewAttribute("Hash", blk.Hash().StringLE()), + *object.NewAttribute("PrevHash", blk.PrevHash.StringLE()), + *object.NewAttribute("Timestamp", strconv.FormatUint(blk.Timestamp, 10)), + } + + var ( + objBytes = bw.Bytes() + res oid.ID + ) + errRetr := retry(func() error { + var errUpload error + res, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled) + return errUpload + }) + if errRetr != nil { + select { + case errCh <- errRetr: + default: + } + return + } + + res.Encode(buffer[blockIndex%int(indexFileSize)*oidSize:]) + } + }(i) + } + + go func() { + wg.Wait() + close(doneCh) + }() + + select { + case err := <-errCh: + return fmt.Errorf("upload error: %w", err) + case <-doneCh: + } + + fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1) } - - fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1) + err := checkBuffer(buffer, uint(indexFileStart/int(indexFileSize)), indexFileSize, indexFileSize) + if err != nil { + return fmt.Errorf("failed to check buffer: %w", err) + } + attrs := []object.Attribute{ + *object.NewAttribute(attrIndex, strconv.Itoa(indexFileStart/int(indexFileSize))), + *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), + } + err = retry(func() error { + var errUpload error + _, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + return errUpload + }) + if err != nil { + return fmt.Errorf("failed to upload index file: %w", err) + } + fmt.Println("Successfully uploaded index file ", indexFileStart/int(indexFileSize)) + clear(buffer) } return nil } // uploadIndexFiles uploads missing index files to the container. -func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error { - attributeKey := ctx.String("index-attribute") - indexFileSize := ctx.Uint("index-file-size") +func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, lastMissingBlock uint, blockAttributeKey, attributeKey string, indexFileSize uint, homomorphicHashingDisabled bool, maxParallelSearches int) ([]byte, error) { fmt.Fprintln(ctx.App.Writer, "Uploading index files...") prm := client.PrmObjectSearch{} @@ -343,28 +377,28 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun return errSearchIndex }) if errSearch != nil { - return fmt.Errorf("index files search failed: %w", errSearch) + return nil, fmt.Errorf("index files search failed: %w", errSearch) } - + var ( + buffer = make([]byte, indexFileSize*oidSize) + doneCh = make(chan []byte, indexFileSize*oidSize) + errCh = make(chan error) + ) existingIndexCount := uint(len(objectIDs)) - expectedIndexCount := currentHeight / indexFileSize + expectedIndexCount := lastMissingBlock / indexFileSize + tail := lastMissingBlock % indexFileSize if existingIndexCount >= expectedIndexCount { fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) - return nil + if tail == 0 || existingIndexCount > expectedIndexCount+1 { + return buffer, nil + } } - var ( - buffer = make([]byte, indexFileSize*oidSize) - doneCh = make(chan struct{}) - errCh = make(chan error) - emptyOid = make([]byte, oidSize) - ) - go func() { defer close(doneCh) - // Main processing loop for each index file. - for i := existingIndexCount; i < expectedIndexCount; i++ { + // Main processing loop for each index file. One more loop is needed for tail search. + for i := existingIndexCount; i < expectedIndexCount+1; i++ { // Start block parsing goroutines. var ( // processedIndices is a mapping from position in buffer to the block index. @@ -416,13 +450,16 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun close(oidCh) wg.Wait() fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i) - + bufferLength := indexFileSize + if i == expectedIndexCount { + bufferLength = tail + } // Check if there are empty OIDs in the generated index file. This may happen // if searchObjects has returned not all blocks within the requested range, ref. // #3645. In this case, retry the search for every missing object. var count int - for idx := range indexFileSize { + for idx := range bufferLength { if _, ok := processedIndices.Load(idx); !ok { count++ objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh) @@ -445,14 +482,20 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun // Check if there are empty OIDs in the generated index file. If it happens at // this stage, then there's a bug in the code. - for k := 0; k < len(buffer); k += oidSize { - if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { - select { - case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)): - default: - } - return + err := checkBuffer(buffer, i, bufferLength, indexFileSize) + if err != nil { + select { + case errCh <- err: + default: } + return + } + if i == expectedIndexCount { + select { + case doneCh <- buffer: + default: + } + return } // Upload index file. @@ -460,8 +503,10 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - err := retry(func() error { - return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + err = retry(func() error { + var errUpload error + _, errUpload = uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + return errUpload }) if err != nil { select { @@ -477,11 +522,10 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun select { case err := <-errCh: - return err - case <-doneCh: + return nil, err + case res := <-doneCh: + return res, nil } - - return nil } // searchObjects searches in parallel for objects with attribute GE startIndex and LT @@ -542,7 +586,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun } // uploadObj uploads object to the container using provided settings. -func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error { +func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) { var ( ownerID user.ID hdr object.Object @@ -550,6 +594,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util chHomomorphic checksum.Checksum v = new(version.Version) prmObjectPutInit client.PrmObjectPutInit + res oid.ID ) ownerID.SetScriptHash(owner) @@ -570,23 +615,27 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util err := hdr.SetIDWithSignature(signer) if err != nil { - return err + return res, err } err = hdr.CheckHeaderVerificationFields() if err != nil { - return err + return res, err + } + err = hdr.CalculateAndSetID() + if err != nil { + return res, err } - writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit) if err != nil { - return fmt.Errorf("failed to initiate object upload: %w", err) + return res, fmt.Errorf("failed to initiate object upload: %w", err) } defer writer.Close() _, err = writer.Write(objData) if err != nil { - return fmt.Errorf("failed to write object data: %w", err) + return res, fmt.Errorf("failed to write object data: %w", err) } - return nil + res, _ = hdr.ID() + return res, nil } func getBlockIndex(header *object.Object, attribute string) (int, error) { @@ -602,3 +651,13 @@ func getBlockIndex(header *object.Object, attribute string) (int, error) { } return -1, fmt.Errorf("attribute %s not found", attribute) } + +func checkBuffer(buffer []byte, i uint, bufferLength uint, indexFileSize uint) error { + emptyOid := make([]byte, oidSize) + for k := 0; uint(k) < bufferLength*oidSize; k += oidSize { + if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { + return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)) + } + } + return nil +}