diff --git a/cli/util/uploader.go b/cli/util/uploader.go index 022c0ad8a..96df4a2d2 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "fmt" + "slices" "strconv" "sync" "time" @@ -20,6 +21,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/pool" @@ -99,7 +101,13 @@ func uploadBin(ctx *cli.Context) error { return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1) } defer p.Close() - net, err := p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{}) + + var net netmap.NetworkInfo + err = retry(func() error { + var errNet error + net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{}) + return errNet + }) if err != nil { return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1) } @@ -155,16 +163,17 @@ func uploadBin(ctx *cli.Context) error { defer wg.Done() for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers { var blk *block.Block - err = retry(func() error { - blk, err = rpc.GetBlockByIndex(uint32(blockIndex)) - if err != nil { - return fmt.Errorf("failed to fetch block %d: %w", blockIndex, err) + errGet := retry(func() error { + var errGetBlock error + blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex)) + if errGetBlock != nil { + return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock) } return nil }) - if err != nil { + if errGet != nil { select { - case errorCh <- err: + case errorCh <- errGet: default: } return @@ -185,12 +194,12 @@ func uploadBin(ctx *cli.Context) error { } objBytes := bw.Bytes() - err = retry(func() error { + errRetr := retry(func() error { return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled) }) - if err != nil { + if errRetr != nil { select { - case errorCh <- err: + case errorCh <- errRetr: default: } return @@ -205,7 +214,7 @@ func uploadBin(ctx *cli.Context) error { }() select { - case err := <-errorCh: + case err = <-errorCh: return cli.Exit(fmt.Errorf("upload error: %w", err), 1) case <-doneCh: } @@ -315,16 +324,14 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) prm.SetFilters(filters) - var ( - objectIDs []oid.ID - err error - ) - err = retry(func() error { - objectIDs, err = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return err + var objectIDs []oid.ID + errSearch := retry(func() error { + var errSearchIndex error + objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) + return errSearchIndex }) - if err != nil { - return fmt.Errorf("search of index files failed: %w", err) + if errSearch != nil { + return fmt.Errorf("search of index files failed: %w", errSearch) } existingIndexCount := uint(len(objectIDs)) @@ -340,21 +347,25 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun buffer = make([]byte, indexFileSize*oidSize) oidCh = make(chan oid.ID, indexFileSize) oidFetcherToProcessor = make(chan struct{}, indexFileSize) + + emptyOid = make([]byte, oidSize) ) defer close(oidCh) for range maxParallelSearches { go func() { for id := range oidCh { var obj *object.Object - err = retry(func() error { - obj, err = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) - return err + errRetr := retry(func() error { + var errGetHead error + obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) + return errGetHead }) - if err != nil { + if errRetr != nil { select { - case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), err): + case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): default: } + return } blockIndex, err := getBlockIndex(obj, blockAttributeKey) if err != nil { @@ -362,6 +373,7 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): default: } + return } offset := (uint(blockIndex) % indexFileSize) * oidSize id.Encode(buffer[offset:]) @@ -384,13 +396,17 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) prm.SetFilters(filters) var objIDs []oid.ID - err = retry(func() error { - objIDs, err = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return err + err := retry(func() error { + var errSearchIndex error + objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) + return errSearchIndex }) if err != nil { - errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err) + select { + case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): + default: + } return } @@ -413,11 +429,21 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun } } } + // Check if there are any empty oids in the created index file. + // This could happen if object payload is empty -> + // attribute is not set correctly -> empty oid is added to the index file. + for k := 0; k < len(buffer); k += oidSize { + if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { + return fmt.Errorf("empty oid found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)) + } + } attrs := []object.Attribute{ *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - err = uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + err := retry(func() error { + return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + }) if err != nil { return fmt.Errorf("failed to upload index file %d: %w", i, err) } @@ -477,7 +503,12 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util func getBlockIndex(header *object.Object, attribute string) (int, error) { for _, attr := range header.UserAttributes() { if attr.Key() == attribute { - return strconv.Atoi(attr.Value()) + value := attr.Value() + blockIndex, err := strconv.Atoi(value) + if err != nil { + return -1, fmt.Errorf("attribute %s has invalid value: %s, error: %w", attribute, value, err) + } + return blockIndex, nil } } return -1, fmt.Errorf("attribute %s not found", attribute)