blockfetcher: use pool for GET and SEARCH operations

Use NeoFS storage nodes pool during blocks fetching to spread the load.

Close #3568

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-26 18:12:07 +03:00
parent 04516e7d26
commit c321eed8ee
2 changed files with 40 additions and 10 deletions

View file

@ -20,6 +20,8 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
)
@ -34,12 +36,31 @@ const (
defaultDownloaderWorkersCount = 100
)
// Constants related to NeoFS pool request timeouts. Such big values are used to avoid
// NeoFS pool timeouts during block search and download.
const (
defaultDialTimeout = 10 * time.Minute
defaultStreamTimeout = 10 * time.Minute
defaultHealthcheckTimeout = 10 * time.Second
)
// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
GetConfig() config.Blockchain
BlockHeight() uint32
}
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
}
// Close closes the pool and returns nil.
func (p poolWrapper) Close() error {
p.Pool.Close()
return nil
}
// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
@ -49,7 +70,7 @@ type Service struct {
stateRootInHeader bool
chain Ledger
client *client.Client
pool poolWrapper
enqueueBlock func(*block.Block) error
account *wallet.Account
@ -112,8 +133,18 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
if len(cfg.Addresses) == 0 {
return &Service{}, errors.New("no addresses provided")
}
params := pool.DefaultOptions()
params.SetHealthcheckTimeout(defaultHealthcheckTimeout)
params.SetNodeDialTimeout(defaultDialTimeout)
params.SetNodeStreamTimeout(defaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params)
if err != nil {
return &Service{}, err
}
return &Service{
chain: chain,
pool: poolWrapper{Pool: p},
log: logger,
cfg: cfg,
@ -149,10 +180,9 @@ func (bfs *Service) Start() error {
var err error
bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background())
bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute)
if err != nil {
if err = bfs.pool.Dial(context.Background()); err != nil {
bfs.isActive.CompareAndSwap(true, false)
return fmt.Errorf("create SDK client: %w", err)
return fmt.Errorf("failed to dial NeoFS pool: %w", err)
}
// Start routine that manages Service shutdown process.
@ -445,7 +475,7 @@ func (bfs *Service) exiter() {
// Everything is done, release resources, turn off the activity marker and let
// the server know about it.
_ = bfs.client.Close()
_ = bfs.pool.Close()
_ = bfs.log.Sync()
bfs.isActive.CompareAndSwap(true, false)
bfs.shutdownCallback()
@ -464,7 +494,7 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
if err != nil {
return nil, err
}
rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false)
rc, err := neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
if err != nil {
return nil, err
}
@ -473,7 +503,7 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
}
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
return neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
}
// isContextCanceledErr returns whether error is a wrapped [context.Canceled].

View file

@ -56,7 +56,7 @@ func TestServiceConstructor(t *testing.T) {
t.Run("default values", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
Addresses: []string{"localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
@ -71,13 +71,13 @@ func TestServiceConstructor(t *testing.T) {
t.Run("SDK client", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
Addresses: []string{"localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
err = service.Start()
require.Error(t, err)
require.Contains(t, err.Error(), "create SDK client")
require.Contains(t, err.Error(), "failed to dial NeoFS pool:")
require.Equal(t, service.IsActive(), false)
})