Merge pull request #3638 from nspcc-dev/uploader-retry-errors

cli: fix `upload-bin` error handling
This commit is contained in:
Anna Shaleva 2024-10-23 16:00:38 +03:00 committed by GitHub
commit 4b10f23aec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"slices"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -20,6 +21,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container" "github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/pool"
@ -99,7 +101,13 @@ func uploadBin(ctx *cli.Context) error {
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1) return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
} }
defer p.Close() defer p.Close()
net, err := p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
var net netmap.NetworkInfo
err = retry(func() error {
var errNet error
net, errNet = p.NetworkInfo(ctx.Context, client.PrmNetworkInfo{})
return errNet
})
if err != nil { if err != nil {
return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1) return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1)
} }
@ -155,16 +163,17 @@ func uploadBin(ctx *cli.Context) error {
defer wg.Done() defer wg.Done()
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers { for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += numWorkers {
var blk *block.Block var blk *block.Block
err = retry(func() error { errGet := retry(func() error {
blk, err = rpc.GetBlockByIndex(uint32(blockIndex)) var errGetBlock error
if err != nil { blk, errGetBlock = rpc.GetBlockByIndex(uint32(blockIndex))
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, err) if errGetBlock != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, errGetBlock)
} }
return nil return nil
}) })
if err != nil { if errGet != nil {
select { select {
case errorCh <- err: case errorCh <- errGet:
default: default:
} }
return return
@ -185,12 +194,12 @@ func uploadBin(ctx *cli.Context) error {
} }
objBytes := bw.Bytes() objBytes := bw.Bytes()
err = retry(func() error { errRetr := retry(func() error {
return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled) return uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs, homomorphicHashingDisabled)
}) })
if err != nil { if errRetr != nil {
select { select {
case errorCh <- err: case errorCh <- errRetr:
default: default:
} }
return return
@ -205,7 +214,7 @@ func uploadBin(ctx *cli.Context) error {
}() }()
select { select {
case err := <-errorCh: case err = <-errorCh:
return cli.Exit(fmt.Errorf("upload error: %w", err), 1) return cli.Exit(fmt.Errorf("upload error: %w", err), 1)
case <-doneCh: case <-doneCh:
} }
@ -315,16 +324,14 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE)
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
prm.SetFilters(filters) prm.SetFilters(filters)
var ( var objectIDs []oid.ID
objectIDs []oid.ID errSearch := retry(func() error {
err error var errSearchIndex error
) objectIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
err = retry(func() error { return errSearchIndex
objectIDs, err = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return err
}) })
if err != nil { if errSearch != nil {
return fmt.Errorf("search of index files failed: %w", err) return fmt.Errorf("search of index files failed: %w", errSearch)
} }
existingIndexCount := uint(len(objectIDs)) existingIndexCount := uint(len(objectIDs))
@ -340,21 +347,25 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
buffer = make([]byte, indexFileSize*oidSize) buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize) oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize) oidFetcherToProcessor = make(chan struct{}, indexFileSize)
emptyOid = make([]byte, oidSize)
) )
defer close(oidCh) defer close(oidCh)
for range maxParallelSearches { for range maxParallelSearches {
go func() { go func() {
for id := range oidCh { for id := range oidCh {
var obj *object.Object var obj *object.Object
err = retry(func() error { errRetr := retry(func() error {
obj, err = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) var errGetHead error
return err obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
}) })
if err != nil { if errRetr != nil {
select { select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), err): case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default: default:
} }
return
} }
blockIndex, err := getBlockIndex(obj, blockAttributeKey) blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil { if err != nil {
@ -362,6 +373,7 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default: default:
} }
return
} }
offset := (uint(blockIndex) % indexFileSize) * oidSize offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:]) id.Encode(buffer[offset:])
@ -384,13 +396,17 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters) prm.SetFilters(filters)
var objIDs []oid.ID var objIDs []oid.ID
err = retry(func() error { err := retry(func() error {
objIDs, err = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) var errSearchIndex error
return err objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
}) })
if err != nil { if err != nil {
errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err) select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
default:
}
return return
} }
@ -413,11 +429,21 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
} }
} }
} }
// Check if there are any empty oids in the created index file.
// This could happen if object payload is empty ->
// attribute is not set correctly -> empty oid is added to the index file.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
return fmt.Errorf("empty oid found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize))
}
}
attrs := []object.Attribute{ attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))), *object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
} }
err = uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil { if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err) return fmt.Errorf("failed to upload index file %d: %w", i, err)
} }
@ -477,7 +503,12 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util
func getBlockIndex(header *object.Object, attribute string) (int, error) { func getBlockIndex(header *object.Object, attribute string) (int, error) {
for _, attr := range header.UserAttributes() { for _, attr := range header.UserAttributes() {
if attr.Key() == attribute { if attr.Key() == attribute {
return strconv.Atoi(attr.Value()) value := attr.Value()
blockIndex, err := strconv.Atoi(value)
if err != nil {
return -1, fmt.Errorf("attribute %s has invalid value: %s, error: %w", attribute, value, err)
}
return blockIndex, nil
} }
} }
return -1, fmt.Errorf("attribute %s not found", attribute) return -1, fmt.Errorf("attribute %s not found", attribute)