mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-04 19:19:44 +00:00
Merge pull request #3706 from nspcc-dev/blockfetcher
NeoFS BlockFetcher: add pool with retries
This commit is contained in:
commit
3b7a7dc767
6 changed files with 198 additions and 82 deletions
|
@ -61,6 +61,17 @@ const (
|
|||
defaultHealthcheckTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
||||
type poolWrapper struct {
|
||||
*pool.Pool
|
||||
}
|
||||
|
||||
// Close closes the pool and returns nil.
|
||||
func (p poolWrapper) Close() error {
|
||||
p.Pool.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func uploadBin(ctx *cli.Context) error {
|
||||
if err := cmdargs.EnsureNone(ctx); err != nil {
|
||||
return err
|
||||
|
@ -99,8 +110,8 @@ func uploadBin(ctx *cli.Context) error {
|
|||
if err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
|
||||
}
|
||||
|
||||
if err = p.Dial(context.Background()); err != nil {
|
||||
pWrapper := poolWrapper{p}
|
||||
if err = pWrapper.Dial(context.Background()); err != nil {
|
||||
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
|
||||
}
|
||||
defer p.Close()
|
||||
|
@ -148,14 +159,14 @@ func uploadBin(ctx *cli.Context) error {
|
|||
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
|
||||
|
||||
if !ctx.Bool("skip-blocks-uploading") {
|
||||
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
||||
err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
|
||||
}
|
||||
oldestMissingBlockIndex = int(currentBlockHeight) + 1
|
||||
}
|
||||
|
||||
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
|
||||
err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
|
||||
if err != nil {
|
||||
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
|
||||
}
|
||||
|
@ -194,6 +205,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
|||
wg sync.WaitGroup
|
||||
numBatches = currentHeight / searchBatchSize
|
||||
emptyBatchFound bool
|
||||
pWrapper = poolWrapper{p}
|
||||
)
|
||||
|
||||
for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
|
||||
|
@ -227,7 +239,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
|||
err error
|
||||
)
|
||||
err = retry(func() error {
|
||||
objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm)
|
||||
objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm)
|
||||
return err
|
||||
}, maxRetries)
|
||||
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
|
||||
|
@ -252,7 +264,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
|
|||
}
|
||||
|
||||
// uploadBlocks uploads the blocks to the container using the pool.
|
||||
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
|
||||
func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
|
||||
if oldestMissingBlockIndex > int(currentBlockHeight) {
|
||||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
|
||||
return nil
|
||||
|
@ -343,7 +355,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
|
|||
}
|
||||
|
||||
// uploadIndexFiles uploads missing index files to the container.
|
||||
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
|
||||
func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
|
||||
var (
|
||||
attributeKey = ctx.String("index-attribute")
|
||||
indexFileSize = ctx.Uint("index-file-size")
|
||||
|
@ -505,7 +517,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
|
||||
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
|
||||
// OID search is finished. Errors are sent to errCh in a non-blocking way.
|
||||
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
|
||||
var res = make(chan oid.ID, 2*searchBatchSize)
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
|
@ -567,7 +579,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
|
|||
}
|
||||
|
||||
// uploadObj uploads object to the container using provided settings.
|
||||
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
|
||||
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
|
||||
var (
|
||||
ownerID user.ID
|
||||
hdr object.Object
|
||||
|
|
|
@ -58,9 +58,8 @@ parameter.
|
|||
The number of downloading routines can be configured via
|
||||
`DownloaderWorkersCount` parameter. It's up to the user to find the
|
||||
balance between the downloading speed and blocks persist speed for every
|
||||
node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a
|
||||
buffered channel of size `IDBatchSize` with further redirection to the
|
||||
block queue.
|
||||
node that uses NeoFS BlockFetcher. Downloaded blocks are placed to the
|
||||
block queue directly.
|
||||
3. **Block Insertion**:
|
||||
Downloaded blocks are inserted into the blockchain using the same logic
|
||||
as in the P2P synchronisation protocol. The block queue is used to order
|
||||
|
|
|
@ -229,7 +229,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
|
|||
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() {
|
||||
close(s.blockFetcherFin)
|
||||
})
|
||||
if err != nil && config.NeoFSBlockFetcherCfg.Enabled {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
|
||||
}
|
||||
|
||||
|
@ -331,6 +331,7 @@ func (s *Server) Shutdown() {
|
|||
}
|
||||
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
|
||||
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
|
||||
s.bFetcherQueue.Discard()
|
||||
s.blockFetcher.Shutdown()
|
||||
}
|
||||
for _, tr := range s.transports {
|
||||
|
@ -341,7 +342,6 @@ func (s *Server) Shutdown() {
|
|||
}
|
||||
s.bQueue.Discard()
|
||||
s.bSyncQueue.Discard()
|
||||
s.bFetcherQueue.Discard()
|
||||
s.serviceLock.RLock()
|
||||
for _, svc := range s.services {
|
||||
svc.Shutdown()
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -18,8 +19,12 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/container"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -34,12 +39,43 @@ const (
|
|||
defaultDownloaderWorkersCount = 100
|
||||
)
|
||||
|
||||
// Constants related to NeoFS pool request timeouts. Such big values are used to avoid
|
||||
// NeoFS pool timeouts during block search and download.
|
||||
const (
|
||||
defaultDialTimeout = 10 * time.Minute
|
||||
defaultStreamTimeout = 10 * time.Minute
|
||||
defaultHealthcheckTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// Constants related to retry mechanism.
|
||||
const (
|
||||
// maxRetries is the maximum number of retries for a single operation.
|
||||
maxRetries = 5
|
||||
// initialBackoff is the initial backoff duration.
|
||||
initialBackoff = 500 * time.Millisecond
|
||||
// backoffFactor is the factor by which the backoff duration is multiplied.
|
||||
backoffFactor = 2
|
||||
// maxBackoff is the maximum backoff duration.
|
||||
maxBackoff = 20 * time.Second
|
||||
)
|
||||
|
||||
// Ledger is an interface to Blockchain sufficient for Service.
|
||||
type Ledger interface {
|
||||
GetConfig() config.Blockchain
|
||||
BlockHeight() uint32
|
||||
}
|
||||
|
||||
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
|
||||
type poolWrapper struct {
|
||||
*pool.Pool
|
||||
}
|
||||
|
||||
// Close closes the pool and returns nil.
|
||||
func (p poolWrapper) Close() error {
|
||||
p.Pool.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Service is a service that fetches blocks from NeoFS.
|
||||
type Service struct {
|
||||
// isActive denotes whether the service is working or in the process of shutdown.
|
||||
|
@ -49,12 +85,11 @@ type Service struct {
|
|||
stateRootInHeader bool
|
||||
|
||||
chain Ledger
|
||||
client *client.Client
|
||||
pool poolWrapper
|
||||
enqueueBlock func(*block.Block) error
|
||||
account *wallet.Account
|
||||
|
||||
oidsCh chan oid.ID
|
||||
blocksCh chan *block.Block
|
||||
oidsCh chan oid.ID
|
||||
// wg is a wait group for block downloaders.
|
||||
wg sync.WaitGroup
|
||||
|
||||
|
@ -68,7 +103,6 @@ type Service struct {
|
|||
exiterToOIDDownloader chan struct{}
|
||||
exiterToShutdown chan struct{}
|
||||
oidDownloaderToExiter chan struct{}
|
||||
blockQueuerToExiter chan struct{}
|
||||
|
||||
shutdownCallback func()
|
||||
}
|
||||
|
@ -79,11 +113,13 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
|
|||
account *wallet.Account
|
||||
err error
|
||||
)
|
||||
|
||||
if !cfg.Enabled {
|
||||
return &Service{}, nil
|
||||
}
|
||||
if cfg.UnlockWallet.Path != "" {
|
||||
walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path)
|
||||
if err != nil {
|
||||
return &Service{}, err
|
||||
return nil, err
|
||||
}
|
||||
for _, acc := range walletFromFile.Accounts {
|
||||
if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil {
|
||||
|
@ -92,12 +128,12 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
|
|||
}
|
||||
}
|
||||
if account == nil {
|
||||
return &Service{}, errors.New("failed to decrypt any account in the wallet")
|
||||
return nil, errors.New("failed to decrypt any account in the wallet")
|
||||
}
|
||||
} else {
|
||||
account, err = wallet.NewAccount()
|
||||
if err != nil {
|
||||
return &Service{}, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if cfg.Timeout <= 0 {
|
||||
|
@ -110,10 +146,20 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
|
|||
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
|
||||
}
|
||||
if len(cfg.Addresses) == 0 {
|
||||
return &Service{}, errors.New("no addresses provided")
|
||||
return nil, errors.New("no addresses provided")
|
||||
}
|
||||
|
||||
params := pool.DefaultOptions()
|
||||
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
|
||||
params.SetNodeDialTimeout(defaultDialTimeout)
|
||||
params.SetNodeStreamTimeout(defaultStreamTimeout)
|
||||
p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Service{
|
||||
chain: chain,
|
||||
pool: poolWrapper{Pool: p},
|
||||
log: logger,
|
||||
cfg: cfg,
|
||||
|
||||
|
@ -126,17 +172,12 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
|
|||
exiterToOIDDownloader: make(chan struct{}),
|
||||
exiterToShutdown: make(chan struct{}),
|
||||
oidDownloaderToExiter: make(chan struct{}),
|
||||
blockQueuerToExiter: make(chan struct{}),
|
||||
|
||||
// Use buffer of two batch sizes to load OIDs in advance:
|
||||
// * first full block of OIDs is processing by Downloader
|
||||
// * second full block of OIDs is available to be fetched by Downloader immediately
|
||||
// * third half-filled block of OIDs is being collected by OIDsFetcher.
|
||||
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
|
||||
|
||||
// Use buffer of a single OIDs batch size to provide smooth downloading and
|
||||
// avoid pauses during blockqueue insertion.
|
||||
blocksCh: make(chan *block.Block, cfg.OIDBatchSize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -146,15 +187,36 @@ func (bfs *Service) Start() error {
|
|||
return nil
|
||||
}
|
||||
bfs.log.Info("starting NeoFS BlockFetcher service")
|
||||
|
||||
var err error
|
||||
var (
|
||||
containerID cid.ID
|
||||
containerObj container.Container
|
||||
err error
|
||||
)
|
||||
bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background())
|
||||
bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute)
|
||||
if err != nil {
|
||||
if err = bfs.pool.Dial(context.Background()); err != nil {
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
return fmt.Errorf("create SDK client: %w", err)
|
||||
return fmt.Errorf("failed to dial NeoFS pool: %w", err)
|
||||
}
|
||||
|
||||
err = containerID.DecodeString(bfs.cfg.ContainerID)
|
||||
if err != nil {
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
return fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
|
||||
err = bfs.retry(func() error {
|
||||
containerObj, err = bfs.pool.ContainerGet(bfs.ctx, containerID, client.PrmContainerGet{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
return fmt.Errorf("failed to get container: %w", err)
|
||||
}
|
||||
containerMagic := containerObj.Attribute("Magic")
|
||||
if containerMagic != strconv.Itoa(int(bfs.chain.GetConfig().Magic)) {
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
return fmt.Errorf("container magic mismatch: expected %d, got %s", bfs.chain.GetConfig().Magic, containerMagic)
|
||||
}
|
||||
// Start routine that manages Service shutdown process.
|
||||
go bfs.exiter()
|
||||
|
||||
|
@ -166,10 +228,6 @@ func (bfs *Service) Start() error {
|
|||
bfs.wg.Add(1)
|
||||
go bfs.blockDownloader()
|
||||
}
|
||||
|
||||
// Start routine that puts blocks into bQueue.
|
||||
go bfs.blockQueuer()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -185,7 +243,9 @@ func (bfs *Service) oidDownloader() {
|
|||
}
|
||||
var force bool
|
||||
if err != nil {
|
||||
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
|
||||
if !isContextCanceledErr(err) {
|
||||
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
|
||||
}
|
||||
force = true
|
||||
}
|
||||
// Stop the service since there's nothing to do anymore.
|
||||
|
@ -219,24 +279,11 @@ func (bfs *Service) blockDownloader() {
|
|||
bfs.stopService(true)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-bfs.ctx.Done():
|
||||
return
|
||||
case bfs.blocksCh <- b:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// blockQueuer puts the block into the bqueue.
|
||||
func (bfs *Service) blockQueuer() {
|
||||
defer close(bfs.blockQueuerToExiter)
|
||||
|
||||
for b := range bfs.blocksCh {
|
||||
select {
|
||||
case <-bfs.ctx.Done():
|
||||
return
|
||||
default:
|
||||
err := bfs.enqueueBlock(b)
|
||||
err = bfs.enqueueBlock(b)
|
||||
if err != nil {
|
||||
bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err))
|
||||
bfs.stopService(true)
|
||||
|
@ -260,6 +307,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
|
|||
prm := client.PrmObjectSearch{}
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
|
||||
filters.AddFilter("IndexSize", fmt.Sprintf("%d", bfs.cfg.IndexFileSize), object.MatchStringEqual)
|
||||
prm.SetFilters(filters)
|
||||
|
||||
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
|
||||
|
@ -423,6 +471,7 @@ func (bfs *Service) exiter() {
|
|||
zap.Bool("force", force),
|
||||
)
|
||||
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
// Cansel all pending OIDs/blocks downloads in case if shutdown requested by user
|
||||
// or caused by downloading error.
|
||||
if force {
|
||||
|
@ -439,15 +488,10 @@ func (bfs *Service) exiter() {
|
|||
close(bfs.oidsCh)
|
||||
bfs.wg.Wait()
|
||||
|
||||
// Send signal to block putter to finish his work. Wait until it's finished.
|
||||
close(bfs.blocksCh)
|
||||
<-bfs.blockQueuerToExiter
|
||||
|
||||
// Everything is done, release resources, turn off the activity marker and let
|
||||
// the server know about it.
|
||||
_ = bfs.client.Close()
|
||||
_ = bfs.pool.Close()
|
||||
_ = bfs.log.Sync()
|
||||
bfs.isActive.CompareAndSwap(true, false)
|
||||
bfs.shutdownCallback()
|
||||
|
||||
// Notify Shutdown routine in case if it's user-triggered shutdown.
|
||||
|
@ -459,21 +503,64 @@ func (bfs *Service) IsActive() bool {
|
|||
return bfs.isActive.Load()
|
||||
}
|
||||
|
||||
// retry function with exponential backoff.
|
||||
func (bfs *Service) retry(action func() error) error {
|
||||
var (
|
||||
err error
|
||||
backoff = initialBackoff
|
||||
timer = time.NewTimer(0)
|
||||
)
|
||||
defer func() {
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
}()
|
||||
|
||||
for i := range maxRetries {
|
||||
if err = action(); err == nil {
|
||||
return nil
|
||||
}
|
||||
if i == maxRetries-1 {
|
||||
break
|
||||
}
|
||||
timer.Reset(backoff)
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-bfs.ctx.Done():
|
||||
return bfs.ctx.Err()
|
||||
}
|
||||
backoff *= time.Duration(backoffFactor)
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
|
||||
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
var rc io.ReadCloser
|
||||
err = bfs.retry(func() error {
|
||||
rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
|
||||
return err
|
||||
})
|
||||
return rc, err
|
||||
}
|
||||
|
||||
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
|
||||
return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
|
||||
var (
|
||||
oids []oid.ID
|
||||
err error
|
||||
)
|
||||
err = bfs.retry(func() error {
|
||||
oids, err = neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
|
||||
return err
|
||||
})
|
||||
return oids, err
|
||||
}
|
||||
|
||||
// isContextCanceledErr returns whether error is a wrapped [context.Canceled].
|
||||
|
|
|
@ -38,6 +38,9 @@ func TestServiceConstructor(t *testing.T) {
|
|||
|
||||
t.Run("empty configuration", func(t *testing.T) {
|
||||
cfg := config.NeoFSBlockFetcher{
|
||||
InternalService: config.InternalService{
|
||||
Enabled: true,
|
||||
},
|
||||
Timeout: 0,
|
||||
OIDBatchSize: 0,
|
||||
DownloaderWorkersCount: 0,
|
||||
|
@ -48,6 +51,9 @@ func TestServiceConstructor(t *testing.T) {
|
|||
|
||||
t.Run("no addresses", func(t *testing.T) {
|
||||
cfg := config.NeoFSBlockFetcher{
|
||||
InternalService: config.InternalService{
|
||||
Enabled: true,
|
||||
},
|
||||
Addresses: []string{},
|
||||
}
|
||||
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
|
||||
|
@ -56,7 +62,10 @@ func TestServiceConstructor(t *testing.T) {
|
|||
|
||||
t.Run("default values", func(t *testing.T) {
|
||||
cfg := config.NeoFSBlockFetcher{
|
||||
Addresses: []string{"http://localhost:8080"},
|
||||
InternalService: config.InternalService{
|
||||
Enabled: true,
|
||||
},
|
||||
Addresses: []string{"localhost:8080"},
|
||||
}
|
||||
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
|
||||
require.NoError(t, err)
|
||||
|
@ -71,13 +80,16 @@ func TestServiceConstructor(t *testing.T) {
|
|||
|
||||
t.Run("SDK client", func(t *testing.T) {
|
||||
cfg := config.NeoFSBlockFetcher{
|
||||
Addresses: []string{"http://localhost:8080"},
|
||||
InternalService: config.InternalService{
|
||||
Enabled: true,
|
||||
},
|
||||
Addresses: []string{"localhost:8080"},
|
||||
}
|
||||
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
|
||||
require.NoError(t, err)
|
||||
err = service.Start()
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "create SDK client")
|
||||
require.Contains(t, err.Error(), "failed to dial NeoFS pool:")
|
||||
require.Equal(t, service.IsActive(), false)
|
||||
})
|
||||
|
||||
|
@ -94,5 +106,6 @@ func TestServiceConstructor(t *testing.T) {
|
|||
}
|
||||
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -44,6 +44,16 @@ var (
|
|||
ErrInvalidCommand = errors.New("invalid command")
|
||||
)
|
||||
|
||||
// Client is a NeoFS client interface.
|
||||
type Client interface {
|
||||
ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
|
||||
ObjectGetInit(ctx context.Context, container cid.ID, id oid.ID, s user.Signer, get client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
|
||||
ObjectRangeInit(ctx context.Context, container cid.ID, id oid.ID, offset uint64, length uint64, s user.Signer, objectRange client.PrmObjectRange) (*client.ObjectRangeReader, error)
|
||||
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
|
||||
ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Get returns a neofs object from the provided url.
|
||||
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
|
||||
// If Command is not provided, full object is requested.
|
||||
|
@ -59,7 +69,7 @@ func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (i
|
|||
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
|
||||
// If Command is not provided, full object is requested. If wrapClientCloser is true,
|
||||
// the client will be closed when the returned ReadCloser is closed.
|
||||
func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) {
|
||||
func GetWithClient(ctx context.Context, c Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) {
|
||||
objectAddr, ps, err := parseNeoFSURL(u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -94,7 +104,7 @@ func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey,
|
|||
|
||||
type clientCloseWrapper struct {
|
||||
io.ReadCloser
|
||||
c *client.Client
|
||||
c Client
|
||||
}
|
||||
|
||||
func (w clientCloseWrapper) Close() error {
|
||||
|
@ -137,7 +147,7 @@ func parseNeoFSURL(u *url.URL) (*oid.Address, []string, error) {
|
|||
return objAddr, ps[2:], nil
|
||||
}
|
||||
|
||||
func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
func getPayload(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
var iorc io.ReadCloser
|
||||
_, rc, err := c.ObjectGetInit(ctx, addr.Container(), addr.Object(), s, client.PrmObjectGet{})
|
||||
if rc != nil {
|
||||
|
@ -146,7 +156,7 @@ func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid.
|
|||
return iorc, err
|
||||
}
|
||||
|
||||
func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
func getRange(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
var iorc io.ReadCloser
|
||||
if len(ps) == 0 {
|
||||
return nil, ErrInvalidRange
|
||||
|
@ -163,11 +173,11 @@ func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Ad
|
|||
return iorc, err
|
||||
}
|
||||
|
||||
func getObjHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (*object.Object, error) {
|
||||
func getObjHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (*object.Object, error) {
|
||||
return c.ObjectHead(ctx, addr.Container(), addr.Object(), s, client.PrmObjectHead{})
|
||||
}
|
||||
|
||||
func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
func getHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
obj, err := getObjHeader(ctx, s, c, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -179,7 +189,7 @@ func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.A
|
|||
return io.NopCloser(bytes.NewReader(res)), nil
|
||||
}
|
||||
|
||||
func getHash(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
func getHash(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
if len(ps) == 0 || ps[0] == "" { // hash of the full payload
|
||||
obj, err := getObjHeader(ctx, s, c, addr)
|
||||
if err != nil {
|
||||
|
@ -236,13 +246,8 @@ func parseRange(s string) (*object.Range, error) {
|
|||
return r, nil
|
||||
}
|
||||
|
||||
// ObjectSearchInitter defines the interface for initializing object search.
|
||||
type ObjectSearchInitter interface {
|
||||
ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
|
||||
}
|
||||
|
||||
// ObjectSearch returns a list of object IDs from the provided container.
|
||||
func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
|
||||
func ObjectSearch(ctx context.Context, c Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
|
||||
var (
|
||||
s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
|
||||
objectIDs []oid.ID
|
||||
|
@ -252,7 +257,7 @@ func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.P
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err)
|
||||
}
|
||||
reader, err := initter.ObjectSearchInit(ctx, containerID, s, prm)
|
||||
reader, err := c.ObjectSearchInit(ctx, containerID, s, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initiate object search: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue