services: add new service for fetching blocks from NeoFS

Close #3496

Co-authored-by: Anna Shaleva <shaleva.ann@nspcc.ru>
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-09-05 15:23:15 +04:00
parent 5dff3fc13d
commit 0b31a29f39
11 changed files with 970 additions and 47 deletions

View file

@ -100,3 +100,16 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: false
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
IndexFileSize: 128000
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"

View file

@ -0,0 +1,74 @@
# NeoFS block storage
Using NeoFS to store chain's blocks and snapshots was proposed in
[#3463](https://github.com/neo-project/neo/issues/3463). NeoGo contains several
extensions utilizing NeoFS block storage aimed to improve node synchronization
efficiency and reduce node storage size.
## Components and functionality
### Block storage schema
A single NeoFS container is used to store blocks and index files. Each block
is stored in a binary form as a separate object with a unique OID and a set of
attributes:
- block object identifier with block index value (`block:1`)
- primary node index (`primary:0`)
- block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`)
- previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`)
- millisecond-precision block timestamp (`time:1627894840919`)
Each index file is an object containing a constant-sized batch of raw block object
IDs in binary form ordered by block index. Each index file is marked with the
following attributes:
- index file identifier with consecutive file index value (`oid:0`)
- the number of OIDs included into index file (`size:128000`)
### NeoFS BlockFetcher
NeoFS BlockFetcher service is designed as an alternative to P2P synchronisation
protocol. It allows to download blocks from a trusted container in the NeoFS network
and persist them to database using standard verification flow. NeoFS BlockFetcher
service primarily used during the node's bootstrap, providing a fast alternative to
P2P blocks synchronisation.
NeoFS BlockFetcher service has two modes of operation:
- Index File Search: Search for index files, which contain batches of block object
IDs and fetch blocks from NeoFS by retrieved OIDs.
- Direct Block Search: Search and fetch blocks directly from NeoFS container via
built-in NeoFS object search mechanism.
Operation mode of BlockFetcher can be configured via `SkipIndexFilesSearch`
parameter.
#### Operation flow
1. **OID Fetching**:
Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index
file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is
configured via `OIDBatchSize` parameter).
Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that
is used to redirect block OIDs to downloading routines is buffered
to provide smooth OIDs delivery without delays. The size of this channel
can be configured via `OIDBatchSize` parameter and equals to `2*OIDBatchSize`.
2. **Parallel Block Downloading**:
The number of downloading routines can be configured via
`DownloaderWorkersCount` parameter. It's up to the user to find the
balance between the downloading speed and blocks persist speed for every
node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a
buffered channel of size `IDBatchSize` with further redirection to the
block queue.
3. **Block Insertion**:
Downloaded blocks are inserted into the blockchain using the same logic
as in the P2P synchronisation protocol. The block queue is used to order
downloaded blocks before they are inserted into the blockchain. The
size of the queue can be configured via the `BQueueSize` parameter
and should be larger than the `OIDBatchSize` parameter to avoid blocking
the downloading routines.
Once all blocks available in the NeoFS container are processed, the service
shuts down automatically.

View file

@ -21,6 +21,7 @@ node-related settings described in the table below.
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. |
| Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. |
| P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. |
| P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. |
@ -153,6 +154,55 @@ where:
Please, refer to the [Notary module documentation](./notary.md#Notary node module) for
details on module features.
### NeoFS BlockFetcher Configuration
`NeoFSBlockFetcher` configuration section contains settings for NeoFS
BlockFetcher module and has the following structure:
```
NeoFSBlockFetcher:
Enabled: true
UnlockWallet:
Path: "./wallet.json"
Password: "pass"
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
IndexFileSize: 128000
```
where:
- `Enabled` enables NeoFS BlockFetcher module.
- `UnlockWallet` contains wallet settings to retrieve account to sign requests to
NeoFS. Without this setting, the module will use randomly generated private key.
For configuration details see [Unlock Wallet Configuration](#Unlock-Wallet-Configuration)
- `Addresses` is a list of NeoFS storage nodes addresses.
- `Timeout` is a timeout for a single request to NeoFS storage node.
- `ContainerID` is a container ID to fetch blocks from.
- `BlockAttribute` is an attribute name of NeoFS object that contains block
data.
- `IndexFileAttribute` is an attribute name of NeoFS index object that contains block
object IDs.
- `DownloaderWorkersCount` is a number of workers that download blocks from
NeoFS in parallel.
- `OIDBatchSize` is the number of blocks to search per a single request to NeoFS
in case of disabled index files search. Also, for both modes of BlockFetcher
operation this setting manages the buffer size of OIDs and blocks transferring
channels.
- `BQueueSize` is a size of the block queue used to manage consecutive blocks
addition to the chain. It must be larger than `OIDBatchSize` and highly recommended
to be `2*OIDBatchSize` or `3*OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search
for blocks directly. It is set to `false` by default.
- `IndexFileSize` is the number of OID objects stored in the index files. This
setting depends on the NeoFS block storage configuration and is applicable only if
`SkipIndexFilesSearch` is set to `false`.
### Metrics Services Configuration
Metrics services configuration describes options for metrics services (pprof,

View file

@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}
// EqualsButServices returns true when the o is the same as a except for services
@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}
// Validate checks ApplicationConfiguration for internal consistency and returns
// an error if any invalid settings are found. This ensures that the application
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err)
}
return nil
}

View file

@ -3,6 +3,7 @@ package config
import (
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -133,3 +134,101 @@ func TestGetAddresses(t *testing.T) {
}
}
}
func TestNeoFSBlockFetcherValidation(t *testing.T) {
type testcase struct {
cfg NeoFSBlockFetcher
shouldFail bool
errMsg string
}
validContainerID := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
invalidContainerID := "invalid-container-id"
cases := []testcase{
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: true,
DownloaderWorkersCount: 4,
},
shouldFail: false,
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: "",
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "container ID is not set",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: invalidContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "invalid container ID",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "addresses are not set",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 5,
},
shouldFail: true,
errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: false,
IndexFileSize: 0,
},
shouldFail: true,
errMsg: "IndexFileSize is not set",
},
}
for _, c := range cases {
err := c.cfg.Validate()
if c.shouldFail {
require.Error(t, err)
require.Contains(t, err.Error(), c.errMsg)
} else {
require.NoError(t, err)
}
}
}

View file

@ -0,0 +1,51 @@
package config
import (
"errors"
"fmt"
"time"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)
// NeoFSBlockFetcher represents the configuration for the NeoFS BlockFetcher service.
type NeoFSBlockFetcher struct {
InternalService `yaml:",inline"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Addresses []string `yaml:"Addresses"`
OIDBatchSize int `yaml:"OIDBatchSize"`
BlockAttribute string `yaml:"BlockAttribute"`
IndexFileAttribute string `yaml:"IndexFileAttribute"`
DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"`
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
}
// Validate checks NeoFSBlockFetcher for internal consistency and ensures
// that all required fields are properly set. It returns an error if the
// configuration is invalid or if the ContainerID cannot be properly decoded.
func (cfg *NeoFSBlockFetcher) Validate() error {
if !cfg.Enabled {
return nil
}
if cfg.ContainerID == "" {
return errors.New("container ID is not set")
}
var containerID cid.ID
err := containerID.DecodeString(cfg.ContainerID)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)
}
if cfg.BQueueSize < cfg.OIDBatchSize {
return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize)
}
if len(cfg.Addresses) == 0 {
return errors.New("addresses are not set")
}
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
return errors.New("IndexFileSize is not set")
}
return nil
}

View file

@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) {
if err != nil {
return Config{}, err
}
err = config.ApplicationConfiguration.Validate()
if err != nil {
return Config{}, err
}
return config, nil
}

View file

@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
@ -103,10 +104,12 @@ type (
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
bFetcherQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service
serviceLock sync.RWMutex
services map[string]Service
@ -133,6 +136,7 @@ type (
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}
blockFetcherFin chan struct{}
transactions chan *transaction.Transaction
@ -182,28 +186,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
s := &Server{
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
blockFetcherFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
@ -219,6 +224,14 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() {
close(s.blockFetcherFin)
})
if err != nil && config.NeoFSBlockFetcherCfg.Enabled {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
@ -295,6 +308,13 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
@ -311,6 +331,9 @@ func (s *Server) Shutdown() {
return
}
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.blockFetcher.Shutdown()
}
for _, tr := range s.transports {
tr.Close()
}
@ -319,6 +342,7 @@ func (s *Server) Shutdown() {
}
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.bFetcherQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
@ -548,6 +572,11 @@ func (s *Server) run() {
s.tryInitStateSync()
s.tryStartServices()
case <-s.blockFetcherFin:
if s.started.Load() {
s.tryInitStateSync()
s.tryStartServices()
}
}
}
}
@ -702,7 +731,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int
if s.stateSync.IsActive() {
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
return false
}
@ -762,6 +791,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
}
@ -782,6 +814,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}
func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
@ -1100,6 +1135,9 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
}
@ -1428,6 +1466,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}
func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return

View file

@ -76,6 +76,8 @@ type (
// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int
NeoFSBlockFetcherCfg config.NeoFSBlockFetcher
}
)
@ -89,24 +91,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err)
}
c := ServerConfig{
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher,
}
return c, nil
}

View file

@ -0,0 +1,479 @@
package blockfetcher
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
const (
// oidSize is the size of the object ID in NeoFS.
oidSize = sha256.Size
// defaultTimeout is the default timeout for NeoFS requests.
defaultTimeout = 5 * time.Minute
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
defaultOIDBatchSize = 8000
// defaultDownloaderWorkersCount is the default number of workers downloading blocks.
defaultDownloaderWorkersCount = 100
)
// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
GetConfig() config.Blockchain
BlockHeight() uint32
}
// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
isActive atomic.Bool
log *zap.Logger
cfg config.NeoFSBlockFetcher
stateRootInHeader bool
chain Ledger
client *client.Client
enqueueBlock func(*block.Block) error
account *wallet.Account
oidsCh chan oid.ID
blocksCh chan *block.Block
// wg is a wait group for block downloaders.
wg sync.WaitGroup
// Global context for download operations cancellation.
ctx context.Context
ctxCancel context.CancelFunc
// A set of routines managing graceful Service shutdown.
quit chan bool
quitOnce sync.Once
exiterToOIDDownloader chan struct{}
exiterToShutdown chan struct{}
oidDownloaderToExiter chan struct{}
blockQueuerToExiter chan struct{}
shutdownCallback func()
}
// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
err error
)
if cfg.UnlockWallet.Path != "" {
walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path)
if err != nil {
return &Service{}, err
}
for _, acc := range walletFromFile.Accounts {
if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil {
account = acc
break
}
}
if account == nil {
return &Service{}, errors.New("failed to decrypt any account in the wallet")
}
} else {
account, err = wallet.NewAccount()
if err != nil {
return &Service{}, err
}
}
if cfg.Timeout <= 0 {
cfg.Timeout = defaultTimeout
}
if cfg.OIDBatchSize <= 0 {
cfg.OIDBatchSize = defaultOIDBatchSize
}
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
}
if len(cfg.Addresses) == 0 {
return &Service{}, errors.New("no addresses provided")
}
return &Service{
chain: chain,
log: logger,
cfg: cfg,
enqueueBlock: putBlock,
account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
shutdownCallback: shutdownCallback,
quit: make(chan bool),
exiterToOIDDownloader: make(chan struct{}),
exiterToShutdown: make(chan struct{}),
oidDownloaderToExiter: make(chan struct{}),
blockQueuerToExiter: make(chan struct{}),
// Use buffer of two batch sizes to load OIDs in advance:
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
// Use buffer of a single OIDs batch size to provide smooth downloading and
// avoid pauses during blockqueue insertion.
blocksCh: make(chan *block.Block, cfg.OIDBatchSize),
}, nil
}
// Start runs the NeoFS BlockFetcher service.
func (bfs *Service) Start() error {
if !bfs.isActive.CompareAndSwap(false, true) {
return nil
}
bfs.log.Info("starting NeoFS BlockFetcher service")
var err error
bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background())
bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute)
if err != nil {
bfs.isActive.CompareAndSwap(true, false)
return fmt.Errorf("create SDK client: %w", err)
}
// Start routine that manages Service shutdown process.
go bfs.exiter()
// Start OIDs downloader routine.
go bfs.oidDownloader()
// Start the set of blocks downloading routines.
for range bfs.cfg.DownloaderWorkersCount {
bfs.wg.Add(1)
go bfs.blockDownloader()
}
// Start routine that puts blocks into bQueue.
go bfs.blockQueuer()
return nil
}
// oidDownloader runs the appropriate blocks OID fetching method based on the configuration.
func (bfs *Service) oidDownloader() {
defer close(bfs.oidDownloaderToExiter)
var err error
if bfs.cfg.SkipIndexFilesSearch {
err = bfs.fetchOIDsBySearch()
} else {
err = bfs.fetchOIDsFromIndexFiles()
}
var force bool
if err != nil {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
force = true
}
// Stop the service since there's nothing to do anymore.
bfs.stopService(force)
}
// blockDownloader downloads the block from NeoFS and sends it to the blocks channel.
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()
for blkOid := range bfs.oidsCh {
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()
rc, err := bfs.objectGet(ctx, blkOid.String())
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to objectGet block", zap.Error(err))
bfs.stopService(true)
return
}
b, err := bfs.readBlock(rc)
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to read block", zap.Error(err))
bfs.stopService(true)
return
}
select {
case <-bfs.ctx.Done():
return
case bfs.blocksCh <- b:
}
}
}
// blockQueuer puts the block into the bqueue.
func (bfs *Service) blockQueuer() {
defer close(bfs.blockQueuerToExiter)
for b := range bfs.blocksCh {
select {
case <-bfs.ctx.Done():
return
default:
err := bfs.enqueueBlock(b)
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Error(err))
bfs.stopService(true)
return
}
}
}
}
// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
func (bfs *Service) fetchOIDsFromIndexFiles() error {
h := bfs.chain.BlockHeight()
startIndex := h/bfs.cfg.IndexFileSize + 1
skip := h % bfs.cfg.IndexFileSize
for {
select {
case <-bfs.exiterToOIDDownloader:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOidsObject, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}
if len(blockOidsObject) == 0 {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no '%s' object found with index %d, stopping", bfs.cfg.IndexFileAttribute, startIndex))
return nil
}
blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}
err = bfs.streamBlockOIDs(oidsRC, int(skip))
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err)
}
startIndex++
skip = 0
}
}
}
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
defer rc.Close()
oidBytes := make([]byte, oidSize)
oidsProcessed := 0
for {
_, err := io.ReadFull(rc, oidBytes)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read OID: %w", err)
}
if oidsProcessed < skip {
oidsProcessed++
continue
}
var oidBlock oid.ID
if err := oidBlock.Decode(oidBytes); err != nil {
return fmt.Errorf("failed to decode OID: %w", err)
}
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
}
oidsProcessed++
}
if oidsProcessed != int(bfs.cfg.IndexFileSize) {
return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed)
}
return nil
}
// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects.
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
batchSize := uint32(bfs.cfg.OIDBatchSize)
for {
select {
case <-bfs.exiterToOIDDownloader:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return err
}
if len(blockOids) == 0 {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
}
}
startIndex += batchSize
}
}
}
// readBlock decodes the block from the read closer and prepares it for adding to the blockchain.
func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
b := block.New(bfs.stateRootInHeader)
r := gio.NewBinReaderFromIO(rc)
b.DecodeBinary(r)
rc.Close()
return b, r.Err
}
// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new
// block OIDs search, cancels all in-progress downloading operations and waits
// until all service routines finish their work.
func (bfs *Service) Shutdown() {
if !bfs.IsActive() {
return
}
bfs.stopService(true)
<-bfs.exiterToShutdown
}
// stopService close quitting goroutine once. It's the only entrypoint to shutdown
// procedure.
func (bfs *Service) stopService(force bool) {
bfs.quitOnce.Do(func() {
bfs.quit <- force
close(bfs.quit)
})
}
// exiter is a routine that is listening to a quitting signal and manages graceful
// Service shutdown process.
func (bfs *Service) exiter() {
// Closing signal may come from anyone, but only once.
force := <-bfs.quit
bfs.log.Info("shutting down NeoFS BlockFetcher service",
zap.Bool("force", force),
)
// Cansel all pending OIDs/blocks downloads in case if shutdown requested by user
// or caused by downloading error.
if force {
bfs.ctxCancel()
}
// Send signal to OID downloader to stop. Wait until OID downloader finishes his
// work.
close(bfs.exiterToOIDDownloader)
<-bfs.oidDownloaderToExiter
// Close OIDs channel to let block downloaders know that there are no more OIDs
// expected. Wait until all downloaders finish their work.
close(bfs.oidsCh)
bfs.wg.Wait()
// Send signal to block putter to finish his work. Wait until it's finished.
close(bfs.blocksCh)
<-bfs.blockQueuerToExiter
// Everything is done, release resources, turn off the activity marker and let
// the server know about it.
_ = bfs.client.Close()
_ = bfs.log.Sync()
bfs.isActive.CompareAndSwap(true, false)
bfs.shutdownCallback()
// Notify Shutdown routine in case if it's user-triggered shutdown.
close(bfs.exiterToShutdown)
}
// IsActive returns true if the NeoFS BlockFetcher service is running.
func (bfs *Service) IsActive() bool {
return bfs.isActive.Load()
}
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
}
rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false)
if err != nil {
return nil, err
}
return rc, nil
}
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
}
// isContextCanceledErr returns whether error is a wrapped [context.Canceled].
// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624.
func isContextCanceledErr(err error) bool {
return errors.Is(err, context.Canceled) ||
strings.Contains(err.Error(), "context canceled")
}

View file

@ -0,0 +1,98 @@
package blockfetcher
import (
"testing"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
type mockLedger struct {
height uint32
}
func (m *mockLedger) GetConfig() config.Blockchain {
return config.Blockchain{}
}
func (m *mockLedger) BlockHeight() uint32 {
return m.height
}
type mockPutBlockFunc struct {
putCalled bool
}
func (m *mockPutBlockFunc) putBlock(b *block.Block) error {
m.putCalled = true
return nil
}
func TestServiceConstructor(t *testing.T) {
logger := zap.NewNop()
ledger := &mockLedger{height: 10}
mockPut := &mockPutBlockFunc{}
shutdownCallback := func() {}
t.Run("empty configuration", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Timeout: 0,
OIDBatchSize: 0,
DownloaderWorkersCount: 0,
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
t.Run("no addresses", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
t.Run("default values", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
require.NotNil(t, service)
require.Equal(t, service.IsActive(), false)
require.Equal(t, service.cfg.Timeout, defaultTimeout)
require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize)
require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount)
require.Equal(t, service.IsActive(), false)
})
t.Run("SDK client", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
err = service.Start()
require.Error(t, err)
require.Contains(t, err.Error(), "create SDK client")
require.Equal(t, service.IsActive(), false)
})
t.Run("invalid wallet", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
InternalService: config.InternalService{
Enabled: true,
UnlockWallet: config.Wallet{
Path: "invalid/path/to/wallet.json",
Password: "wrong-password",
},
},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
}