Merge pull request #3515 from nspcc-dev/block-fetcher

services: add new block fetching from NeoFS service
This commit is contained in:
Anna Shaleva 2024-09-12 11:45:31 +05:00 committed by GitHub
commit e1d5ac8557
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1315 additions and 97 deletions

87
cli/server/dump_bin.go Normal file
View file

@ -0,0 +1,87 @@
package server
import (
"fmt"
"os"
"path/filepath"
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/urfave/cli/v2"
)
func dumpBin(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
}
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)
}
log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration)
if err != nil {
return cli.Exit(err, 1)
}
if logCloser != nil {
defer func() { _ = logCloser() }()
}
count := uint32(ctx.Uint("count"))
start := uint32(ctx.Uint("start"))
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil {
return err
}
defer func() {
pprof.ShutDown()
prometheus.ShutDown()
chain.Close()
}()
blocksCount := chain.BlockHeight() + 1
if start+count > blocksCount {
return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1)
}
if count == 0 {
count = blocksCount - start
}
out := ctx.String("out")
if out == "" {
return cli.Exit("output directory is not specified", 1)
}
if _, err = os.Stat(out); os.IsNotExist(err) {
if err = os.MkdirAll(out, os.ModePerm); err != nil {
return cli.Exit(fmt.Sprintf("failed to create directory %s: %s", out, err), 1)
}
}
if err != nil {
return cli.Exit(fmt.Sprintf("failed to check directory %s: %s", out, err), 1)
}
for i := start; i < start+count; i++ {
blk, err := chain.GetBlock(chain.GetHeaderHash(i))
if err != nil {
return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1)
}
filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i))
if err = saveBlockToFile(blk, filePath); err != nil {
return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1)
}
}
return nil
}
func saveBlockToFile(blk *block.Block, filePath string) error {
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
writer := io.NewBinWriterFromIO(file)
blk.EncodeBinary(writer)
return writer.Err
}

View file

@ -0,0 +1,91 @@
package server_test
import (
"os"
"path/filepath"
"strconv"
"testing"
"github.com/nspcc-dev/neo-go/internal/testcli"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
func TestDumpBin(t *testing.T) {
tmpDir := t.TempDir()
loadConfig := func(t *testing.T) config.Config {
chainPath := filepath.Join(tmpDir, "neogotestchain")
cfg, err := config.LoadFile(filepath.Join("..", "..", "config", "protocol.unit_testnet.yml"))
require.NoError(t, err, "could not load config")
cfg.ApplicationConfiguration.DBConfiguration.Type = dbconfig.LevelDB
cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions.DataDirectoryPath = chainPath
return cfg
}
cfg := loadConfig(t)
out, err := yaml.Marshal(cfg)
require.NoError(t, err)
cfgPath := filepath.Join(tmpDir, "protocol.unit_testnet.yml")
require.NoError(t, os.WriteFile(cfgPath, out, os.ModePerm))
e := testcli.NewExecutor(t, false)
restoreArgs := []string{"neo-go", "db", "restore",
"--config-file", cfgPath, "--in", inDump}
e.Run(t, restoreArgs...)
t.Run("missing output directory", func(t *testing.T) {
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", ""}
e.RunWithErrorCheck(t, "output directory is not specified", args...)
})
t.Run("successful dump", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir, "--count", "5", "--start", "0"}
e.Run(t, args...)
require.DirExists(t, outDir)
for i := range 5 {
blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin")
require.FileExists(t, blockFile)
}
})
t.Run("invalid block range", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "invalid-blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir, "--count", "1000", "--start", "0"}
e.RunWithError(t, args...)
})
t.Run("zero blocks (full chain dump)", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "full-dump")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", cfgPath, "--out", outDir}
e.Run(t, args...)
require.DirExists(t, outDir)
for i := range 5 {
blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin")
require.FileExists(t, blockFile)
}
})
t.Run("invalid config file", func(t *testing.T) {
outDir := filepath.Join(tmpDir, "blocks")
args := []string{"neo-go", "db", "dump-bin",
"--config-file", "invalid-config-path", "--out", outDir}
e.RunWithError(t, args...)
})
}

View file

@ -102,6 +102,13 @@ func NewCommands() []*cli.Command {
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "dump-bin",
Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format",
UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]",
Action: dumpBin,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "Restore blocks from the file",

View file

@ -100,3 +100,16 @@ ApplicationConfiguration:
Enabled: false
Addresses:
- ":2113"
NeoFSBlockFetcher:
Enabled: false
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
IndexFileSize: 128000
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"

View file

@ -0,0 +1,74 @@
# NeoFS block storage
Using NeoFS to store chain's blocks and snapshots was proposed in
[#3463](https://github.com/neo-project/neo/issues/3463). NeoGo contains several
extensions utilizing NeoFS block storage aimed to improve node synchronization
efficiency and reduce node storage size.
## Components and functionality
### Block storage schema
A single NeoFS container is used to store blocks and index files. Each block
is stored in a binary form as a separate object with a unique OID and a set of
attributes:
- block object identifier with block index value (`block:1`)
- primary node index (`primary:0`)
- block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`)
- previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`)
- millisecond-precision block timestamp (`time:1627894840919`)
Each index file is an object containing a constant-sized batch of raw block object
IDs in binary form ordered by block index. Each index file is marked with the
following attributes:
- index file identifier with consecutive file index value (`oid:0`)
- the number of OIDs included into index file (`size:128000`)
### NeoFS BlockFetcher
NeoFS BlockFetcher service is designed as an alternative to P2P synchronisation
protocol. It allows to download blocks from a trusted container in the NeoFS network
and persist them to database using standard verification flow. NeoFS BlockFetcher
service primarily used during the node's bootstrap, providing a fast alternative to
P2P blocks synchronisation.
NeoFS BlockFetcher service has two modes of operation:
- Index File Search: Search for index files, which contain batches of block object
IDs and fetch blocks from NeoFS by retrieved OIDs.
- Direct Block Search: Search and fetch blocks directly from NeoFS container via
built-in NeoFS object search mechanism.
Operation mode of BlockFetcher can be configured via `SkipIndexFilesSearch`
parameter.
#### Operation flow
1. **OID Fetching**:
Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index
file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is
configured via `OIDBatchSize` parameter).
Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that
is used to redirect block OIDs to downloading routines is buffered
to provide smooth OIDs delivery without delays. The size of this channel
can be configured via `OIDBatchSize` parameter and equals to `2*OIDBatchSize`.
2. **Parallel Block Downloading**:
The number of downloading routines can be configured via
`DownloaderWorkersCount` parameter. It's up to the user to find the
balance between the downloading speed and blocks persist speed for every
node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a
buffered channel of size `IDBatchSize` with further redirection to the
block queue.
3. **Block Insertion**:
Downloaded blocks are inserted into the blockchain using the same logic
as in the P2P synchronisation protocol. The block queue is used to order
downloaded blocks before they are inserted into the blockchain. The
size of the queue can be configured via the `BQueueSize` parameter
and should be larger than the `OIDBatchSize` parameter to avoid blocking
the downloading routines.
Once all blocks available in the NeoFS container are processed, the service
shuts down automatically.

View file

@ -21,6 +21,7 @@ node-related settings described in the table below.
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. |
| Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. |
| P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. |
| P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. |
@ -153,6 +154,55 @@ where:
Please, refer to the [Notary module documentation](./notary.md#Notary node module) for
details on module features.
### NeoFS BlockFetcher Configuration
`NeoFSBlockFetcher` configuration section contains settings for NeoFS
BlockFetcher module and has the following structure:
```
NeoFSBlockFetcher:
Enabled: true
UnlockWallet:
Path: "./wallet.json"
Password: "pass"
Addresses:
- st1.storage.fs.neo.org:8080
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 16000
SkipIndexFilesSearch: false
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
IndexFileSize: 128000
```
where:
- `Enabled` enables NeoFS BlockFetcher module.
- `UnlockWallet` contains wallet settings to retrieve account to sign requests to
NeoFS. Without this setting, the module will use randomly generated private key.
For configuration details see [Unlock Wallet Configuration](#Unlock-Wallet-Configuration)
- `Addresses` is a list of NeoFS storage nodes addresses.
- `Timeout` is a timeout for a single request to NeoFS storage node.
- `ContainerID` is a container ID to fetch blocks from.
- `BlockAttribute` is an attribute name of NeoFS object that contains block
data.
- `IndexFileAttribute` is an attribute name of NeoFS index object that contains block
object IDs.
- `DownloaderWorkersCount` is a number of workers that download blocks from
NeoFS in parallel.
- `OIDBatchSize` is the number of blocks to search per a single request to NeoFS
in case of disabled index files search. Also, for both modes of BlockFetcher
operation this setting manages the buffer size of OIDs and blocks transferring
channels.
- `BQueueSize` is a size of the block queue used to manage consecutive blocks
addition to the chain. It must be larger than `OIDBatchSize` and highly recommended
to be `2*OIDBatchSize` or `3*OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search
for blocks directly. It is set to `false` by default.
- `IndexFileSize` is the number of OID objects stored in the index files. This
setting depends on the NeoFS block storage configuration and is applicable only if
`SkipIndexFilesSearch` is set to `false`.
### Metrics Services Configuration
Metrics services configuration describes options for metrics services (pprof,

2
go.mod
View file

@ -30,6 +30,7 @@ require (
golang.org/x/term v0.23.0
golang.org/x/text v0.17.0
golang.org/x/tools v0.24.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v3 v3.0.1
)
@ -73,7 +74,6 @@ require (
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

View file

@ -23,12 +23,13 @@ type ApplicationConfiguration struct {
Pprof BasicService `yaml:"Pprof"`
Prometheus BasicService `yaml:"Prometheus"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
Relay bool `yaml:"Relay"`
Consensus Consensus `yaml:"Consensus"`
RPC RPC `yaml:"RPC"`
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"`
}
// EqualsButServices returns true when the o is the same as a except for services
@ -141,3 +142,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
}
return addrs, nil
}
// Validate checks ApplicationConfiguration for internal consistency and returns
// an error if any invalid settings are found. This ensures that the application
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err)
}
return nil
}

View file

@ -3,6 +3,7 @@ package config
import (
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -133,3 +134,101 @@ func TestGetAddresses(t *testing.T) {
}
}
}
func TestNeoFSBlockFetcherValidation(t *testing.T) {
type testcase struct {
cfg NeoFSBlockFetcher
shouldFail bool
errMsg string
}
validContainerID := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
invalidContainerID := "invalid-container-id"
cases := []testcase{
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: true,
DownloaderWorkersCount: 4,
},
shouldFail: false,
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: "",
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "container ID is not set",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: invalidContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "invalid container ID",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{},
OIDBatchSize: 10,
BQueueSize: 20,
},
shouldFail: true,
errMsg: "addresses are not set",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 5,
},
shouldFail: true,
errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)",
},
{
cfg: NeoFSBlockFetcher{
InternalService: InternalService{Enabled: true},
Timeout: time.Second,
ContainerID: validContainerID,
Addresses: []string{"127.0.0.1"},
OIDBatchSize: 10,
BQueueSize: 20,
SkipIndexFilesSearch: false,
IndexFileSize: 0,
},
shouldFail: true,
errMsg: "IndexFileSize is not set",
},
}
for _, c := range cases {
err := c.cfg.Validate()
if c.shouldFail {
require.Error(t, err)
require.Contains(t, err.Error(), c.errMsg)
} else {
require.NoError(t, err)
}
}
}

View file

@ -0,0 +1,51 @@
package config
import (
"errors"
"fmt"
"time"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
)
// NeoFSBlockFetcher represents the configuration for the NeoFS BlockFetcher service.
type NeoFSBlockFetcher struct {
InternalService `yaml:",inline"`
Timeout time.Duration `yaml:"Timeout"`
ContainerID string `yaml:"ContainerID"`
Addresses []string `yaml:"Addresses"`
OIDBatchSize int `yaml:"OIDBatchSize"`
BlockAttribute string `yaml:"BlockAttribute"`
IndexFileAttribute string `yaml:"IndexFileAttribute"`
DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"`
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
}
// Validate checks NeoFSBlockFetcher for internal consistency and ensures
// that all required fields are properly set. It returns an error if the
// configuration is invalid or if the ContainerID cannot be properly decoded.
func (cfg *NeoFSBlockFetcher) Validate() error {
if !cfg.Enabled {
return nil
}
if cfg.ContainerID == "" {
return errors.New("container ID is not set")
}
var containerID cid.ID
err := containerID.DecodeString(cfg.ContainerID)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)
}
if cfg.BQueueSize < cfg.OIDBatchSize {
return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize)
}
if len(cfg.Addresses) == 0 {
return errors.New("addresses are not set")
}
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
return errors.New("IndexFileSize is not set")
}
return nil
}

View file

@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) {
if err != nil {
return Config{}, err
}
err = config.ApplicationConfiguration.Validate()
if err != nil {
return Config{}, err
}
return config, nil
}

View file

@ -3,6 +3,7 @@ package bqueue
import (
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"go.uber.org/zap"
@ -15,6 +16,17 @@ type Blockqueuer interface {
BlockHeight() uint32
}
// OperationMode is the mode of operation for the block queue.
// Could be either Blocking or NonBlocking.
type OperationMode byte
const (
// NonBlocking means that PutBlock will return immediately if the queue is full.
NonBlocking OperationMode = 0
// Blocking means that PutBlock will wait until there is enough space in the queue.
Blocking OperationMode = 1
)
// Queue is the block queue.
type Queue struct {
log *zap.Logger
@ -27,29 +39,36 @@ type Queue struct {
discarded atomic.Bool
len int
lenUpdateF func(int)
cacheSize int
mode OperationMode
}
// CacheSize is the amount of blocks above the current height
// DefaultCacheSize is the default amount of blocks above the current height
// which are stored in the queue.
const CacheSize = 2000
const DefaultCacheSize = 2000
func indexToPosition(i uint32) int {
return int(i) % CacheSize
func (bq *Queue) indexToPosition(i uint32) int {
return int(i) % bq.cacheSize
}
// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), lenMetricsUpdater func(l int)) *Queue {
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue {
if log == nil {
return nil
}
if cacheSize <= 0 {
cacheSize = DefaultCacheSize
}
return &Queue{
log: log,
queue: make([]*block.Block, CacheSize),
queue: make([]*block.Block, cacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
}
}
@ -63,12 +82,12 @@ func (bq *Queue) Run() {
}
for {
h := bq.chain.BlockHeight()
pos := indexToPosition(h + 1)
pos := bq.indexToPosition(h + 1)
bq.queueLock.Lock()
b := bq.queue[pos]
// The chain moved forward using blocks from other sources (consensus).
for i := lastHeight; i < h; i++ {
old := indexToPosition(i + 1)
old := bq.indexToPosition(i + 1)
if bq.queue[old] != nil && bq.queue[old].Index == i {
bq.len--
bq.queue[old] = nil
@ -114,17 +133,38 @@ func (bq *Queue) PutBlock(block *block.Block) error {
if bq.discarded.Load() {
return nil
}
if block.Index <= h || h+CacheSize < block.Index {
// can easily happen when fetching the same blocks from
// different peers, thus not considered as error
// Can easily happen when fetching the same blocks from
// different peers, thus not considered as error.
if block.Index <= h {
return nil
}
pos := indexToPosition(block.Index)
if h+uint32(bq.cacheSize) < block.Index {
switch bq.mode {
case NonBlocking:
return nil
case Blocking:
bq.queueLock.Unlock()
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
if bq.discarded.Load() {
bq.queueLock.Lock()
return nil
}
h = bq.chain.BlockHeight()
if h+uint32(bq.cacheSize) >= block.Index {
bq.queueLock.Lock()
break
}
}
}
}
pos := bq.indexToPosition(block.Index)
// If we already have it, keep the old block, throw away the new one.
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
bq.len++
bq.queue[pos] = block
for pos < CacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
bq.lastQ = bq.queue[pos].Index
pos++
}
@ -147,7 +187,7 @@ func (bq *Queue) PutBlock(block *block.Block) error {
func (bq *Queue) LastQueued() (uint32, int) {
bq.queueLock.RLock()
defer bq.queueLock.RUnlock()
return bq.lastQ, CacheSize - bq.len
return bq.lastQ, bq.cacheSize - bq.len
}
// Discard stops the queue and prevents it from accepting more blocks to enqueue.

View file

@ -13,7 +13,7 @@ import (
func TestBlockQueue(t *testing.T) {
chain := fakechain.NewFakeChain()
// notice, it's not yet running
bq := New(chain, zaptest.NewLogger(t), nil, nil)
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking)
blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}
@ -24,7 +24,7 @@ func TestBlockQueue(t *testing.T) {
}
last, capLeft := bq.LastQueued()
assert.Equal(t, uint32(0), last)
assert.Equal(t, CacheSize-2, capLeft)
assert.Equal(t, DefaultCacheSize-2, capLeft)
// nothing should be put into the blockchain
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 2, bq.length())
@ -35,18 +35,18 @@ func TestBlockQueue(t *testing.T) {
// but they're still not put into the blockchain, because bq isn't running
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, CacheSize-4, capLeft)
assert.Equal(t, DefaultCacheSize-4, capLeft)
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length())
// block with too big index is dropped
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + CacheSize + 1}}))
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}}))
assert.Equal(t, 4, bq.length())
go bq.Run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, DefaultCacheSize, capLeft)
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks
@ -55,7 +55,7 @@ func TestBlockQueue(t *testing.T) {
}
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, CacheSize, capLeft)
assert.Equal(t, DefaultCacheSize, capLeft)
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active
@ -75,7 +75,7 @@ func TestBlockQueue(t *testing.T) {
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(8), last)
assert.Equal(t, CacheSize-1, capLeft)
assert.Equal(t, DefaultCacheSize-1, capLeft)
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(8), chain.BlockHeight())
bq.Discard()

View file

@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/blockfetcher"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
@ -103,10 +104,12 @@ type (
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
bFetcherQueue *bqueue.Queue
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
blockFetcher *blockfetcher.Service
serviceLock sync.RWMutex
services map[string]Service
@ -133,6 +136,7 @@ type (
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}
blockFetcherFin chan struct{}
transactions chan *transaction.Transaction
@ -182,28 +186,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
s := &Server{
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,
ServerConfig: config,
chain: chain,
id: randomID(),
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
blockFetcherFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool),
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync,
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
@ -216,9 +221,17 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
}, updateBlockQueueLenMetric)
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, func() {
close(s.blockFetcherFin)
})
if err != nil && config.NeoFSBlockFetcherCfg.Enabled {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
@ -295,6 +308,13 @@ func (s *Server) Start() {
go s.relayBlocksLoop()
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
@ -311,6 +331,9 @@ func (s *Server) Shutdown() {
return
}
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.blockFetcher.Shutdown()
}
for _, tr := range s.transports {
tr.Close()
}
@ -319,6 +342,7 @@ func (s *Server) Shutdown() {
}
s.bQueue.Discard()
s.bSyncQueue.Discard()
s.bFetcherQueue.Discard()
s.serviceLock.RLock()
for _, svc := range s.services {
svc.Shutdown()
@ -548,6 +572,11 @@ func (s *Server) run() {
s.tryInitStateSync()
s.tryStartServices()
case <-s.blockFetcherFin:
if s.started.Load() {
s.tryInitStateSync()
s.tryStartServices()
}
}
}
}
@ -702,7 +731,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int
if s.stateSync.IsActive() {
if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
return false
}
@ -762,6 +791,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
}
@ -782,6 +814,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}
func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
@ -1100,6 +1135,9 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error
// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
}
@ -1322,7 +1360,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
if !lastRequestedHeight.CompareAndSwap(old, needHeight) {
continue
}
} else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) {
} else if old < currHeight+(bqueue.DefaultCacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount
@ -1331,7 +1369,7 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
}
}
} else {
index := mrand.IntN(bqueue.CacheSize / payload.MaxHashesCount)
index := mrand.IntN(bqueue.DefaultCacheSize / payload.MaxHashesCount)
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
}
break
@ -1428,6 +1466,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}
func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return

View file

@ -76,6 +76,8 @@ type (
// BroadcastFactor is the factor (0-100) for fan-out optimization.
BroadcastFactor int
NeoFSBlockFetcherCfg config.NeoFSBlockFetcher
}
)
@ -89,24 +91,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) {
return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err)
}
c := ServerConfig{
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
UserAgent: cfg.GenerateUserAgent(),
Addresses: addrs,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.P2P.DialTimeout,
ProtoTickInterval: appConfig.P2P.ProtoTickInterval,
PingInterval: appConfig.P2P.PingInterval,
PingTimeout: appConfig.P2P.PingTimeout,
MaxPeers: appConfig.P2P.MaxPeers,
AttemptConnPeers: appConfig.P2P.AttemptConnPeers,
MinPeers: appConfig.P2P.MinPeers,
TimePerBlock: protoConfig.TimePerBlock,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize,
BroadcastFactor: appConfig.P2P.BroadcastFactor,
NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher,
}
return c, nil
}

View file

@ -0,0 +1,479 @@
package blockfetcher
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
const (
// oidSize is the size of the object ID in NeoFS.
oidSize = sha256.Size
// defaultTimeout is the default timeout for NeoFS requests.
defaultTimeout = 5 * time.Minute
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
defaultOIDBatchSize = 8000
// defaultDownloaderWorkersCount is the default number of workers downloading blocks.
defaultDownloaderWorkersCount = 100
)
// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
GetConfig() config.Blockchain
BlockHeight() uint32
}
// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
isActive atomic.Bool
log *zap.Logger
cfg config.NeoFSBlockFetcher
stateRootInHeader bool
chain Ledger
client *client.Client
enqueueBlock func(*block.Block) error
account *wallet.Account
oidsCh chan oid.ID
blocksCh chan *block.Block
// wg is a wait group for block downloaders.
wg sync.WaitGroup
// Global context for download operations cancellation.
ctx context.Context
ctxCancel context.CancelFunc
// A set of routines managing graceful Service shutdown.
quit chan bool
quitOnce sync.Once
exiterToOIDDownloader chan struct{}
exiterToShutdown chan struct{}
oidDownloaderToExiter chan struct{}
blockQueuerToExiter chan struct{}
shutdownCallback func()
}
// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
err error
)
if cfg.UnlockWallet.Path != "" {
walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path)
if err != nil {
return &Service{}, err
}
for _, acc := range walletFromFile.Accounts {
if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil {
account = acc
break
}
}
if account == nil {
return &Service{}, errors.New("failed to decrypt any account in the wallet")
}
} else {
account, err = wallet.NewAccount()
if err != nil {
return &Service{}, err
}
}
if cfg.Timeout <= 0 {
cfg.Timeout = defaultTimeout
}
if cfg.OIDBatchSize <= 0 {
cfg.OIDBatchSize = defaultOIDBatchSize
}
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
}
if len(cfg.Addresses) == 0 {
return &Service{}, errors.New("no addresses provided")
}
return &Service{
chain: chain,
log: logger,
cfg: cfg,
enqueueBlock: putBlock,
account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
shutdownCallback: shutdownCallback,
quit: make(chan bool),
exiterToOIDDownloader: make(chan struct{}),
exiterToShutdown: make(chan struct{}),
oidDownloaderToExiter: make(chan struct{}),
blockQueuerToExiter: make(chan struct{}),
// Use buffer of two batch sizes to load OIDs in advance:
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
// Use buffer of a single OIDs batch size to provide smooth downloading and
// avoid pauses during blockqueue insertion.
blocksCh: make(chan *block.Block, cfg.OIDBatchSize),
}, nil
}
// Start runs the NeoFS BlockFetcher service.
func (bfs *Service) Start() error {
if !bfs.isActive.CompareAndSwap(false, true) {
return nil
}
bfs.log.Info("starting NeoFS BlockFetcher service")
var err error
bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background())
bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute)
if err != nil {
bfs.isActive.CompareAndSwap(true, false)
return fmt.Errorf("create SDK client: %w", err)
}
// Start routine that manages Service shutdown process.
go bfs.exiter()
// Start OIDs downloader routine.
go bfs.oidDownloader()
// Start the set of blocks downloading routines.
for range bfs.cfg.DownloaderWorkersCount {
bfs.wg.Add(1)
go bfs.blockDownloader()
}
// Start routine that puts blocks into bQueue.
go bfs.blockQueuer()
return nil
}
// oidDownloader runs the appropriate blocks OID fetching method based on the configuration.
func (bfs *Service) oidDownloader() {
defer close(bfs.oidDownloaderToExiter)
var err error
if bfs.cfg.SkipIndexFilesSearch {
err = bfs.fetchOIDsBySearch()
} else {
err = bfs.fetchOIDsFromIndexFiles()
}
var force bool
if err != nil {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
force = true
}
// Stop the service since there's nothing to do anymore.
bfs.stopService(force)
}
// blockDownloader downloads the block from NeoFS and sends it to the blocks channel.
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()
for blkOid := range bfs.oidsCh {
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()
rc, err := bfs.objectGet(ctx, blkOid.String())
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to objectGet block", zap.Error(err))
bfs.stopService(true)
return
}
b, err := bfs.readBlock(rc)
if err != nil {
if isContextCanceledErr(err) {
return
}
bfs.log.Error("failed to read block", zap.Error(err))
bfs.stopService(true)
return
}
select {
case <-bfs.ctx.Done():
return
case bfs.blocksCh <- b:
}
}
}
// blockQueuer puts the block into the bqueue.
func (bfs *Service) blockQueuer() {
defer close(bfs.blockQueuerToExiter)
for b := range bfs.blocksCh {
select {
case <-bfs.ctx.Done():
return
default:
err := bfs.enqueueBlock(b)
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Error(err))
bfs.stopService(true)
return
}
}
}
}
// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
func (bfs *Service) fetchOIDsFromIndexFiles() error {
h := bfs.chain.BlockHeight()
startIndex := h/bfs.cfg.IndexFileSize + 1
skip := h % bfs.cfg.IndexFileSize
for {
select {
case <-bfs.exiterToOIDDownloader:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOidsObject, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}
if len(blockOidsObject) == 0 {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no '%s' object found with index %d, stopping", bfs.cfg.IndexFileAttribute, startIndex))
return nil
}
blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}
err = bfs.streamBlockOIDs(oidsRC, int(skip))
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err)
}
startIndex++
skip = 0
}
}
}
// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
defer rc.Close()
oidBytes := make([]byte, oidSize)
oidsProcessed := 0
for {
_, err := io.ReadFull(rc, oidBytes)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read OID: %w", err)
}
if oidsProcessed < skip {
oidsProcessed++
continue
}
var oidBlock oid.ID
if err := oidBlock.Decode(oidBytes); err != nil {
return fmt.Errorf("failed to decode OID: %w", err)
}
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
}
oidsProcessed++
}
if oidsProcessed != int(bfs.cfg.IndexFileSize) {
return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed)
}
return nil
}
// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects.
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
batchSize := uint32(bfs.cfg.OIDBatchSize)
for {
select {
case <-bfs.exiterToOIDDownloader:
return nil
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return err
}
if len(blockOids) == 0 {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
}
}
startIndex += batchSize
}
}
}
// readBlock decodes the block from the read closer and prepares it for adding to the blockchain.
func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
b := block.New(bfs.stateRootInHeader)
r := gio.NewBinReaderFromIO(rc)
b.DecodeBinary(r)
rc.Close()
return b, r.Err
}
// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new
// block OIDs search, cancels all in-progress downloading operations and waits
// until all service routines finish their work.
func (bfs *Service) Shutdown() {
if !bfs.IsActive() {
return
}
bfs.stopService(true)
<-bfs.exiterToShutdown
}
// stopService close quitting goroutine once. It's the only entrypoint to shutdown
// procedure.
func (bfs *Service) stopService(force bool) {
bfs.quitOnce.Do(func() {
bfs.quit <- force
close(bfs.quit)
})
}
// exiter is a routine that is listening to a quitting signal and manages graceful
// Service shutdown process.
func (bfs *Service) exiter() {
// Closing signal may come from anyone, but only once.
force := <-bfs.quit
bfs.log.Info("shutting down NeoFS BlockFetcher service",
zap.Bool("force", force),
)
// Cansel all pending OIDs/blocks downloads in case if shutdown requested by user
// or caused by downloading error.
if force {
bfs.ctxCancel()
}
// Send signal to OID downloader to stop. Wait until OID downloader finishes his
// work.
close(bfs.exiterToOIDDownloader)
<-bfs.oidDownloaderToExiter
// Close OIDs channel to let block downloaders know that there are no more OIDs
// expected. Wait until all downloaders finish their work.
close(bfs.oidsCh)
bfs.wg.Wait()
// Send signal to block putter to finish his work. Wait until it's finished.
close(bfs.blocksCh)
<-bfs.blockQueuerToExiter
// Everything is done, release resources, turn off the activity marker and let
// the server know about it.
_ = bfs.client.Close()
_ = bfs.log.Sync()
bfs.isActive.CompareAndSwap(true, false)
bfs.shutdownCallback()
// Notify Shutdown routine in case if it's user-triggered shutdown.
close(bfs.exiterToShutdown)
}
// IsActive returns true if the NeoFS BlockFetcher service is running.
func (bfs *Service) IsActive() bool {
return bfs.isActive.Load()
}
func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
}
rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false)
if err != nil {
return nil, err
}
return rc, nil
}
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
}
// isContextCanceledErr returns whether error is a wrapped [context.Canceled].
// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624.
func isContextCanceledErr(err error) bool {
return errors.Is(err, context.Canceled) ||
strings.Contains(err.Error(), "context canceled")
}

View file

@ -0,0 +1,98 @@
package blockfetcher
import (
"testing"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
type mockLedger struct {
height uint32
}
func (m *mockLedger) GetConfig() config.Blockchain {
return config.Blockchain{}
}
func (m *mockLedger) BlockHeight() uint32 {
return m.height
}
type mockPutBlockFunc struct {
putCalled bool
}
func (m *mockPutBlockFunc) putBlock(b *block.Block) error {
m.putCalled = true
return nil
}
func TestServiceConstructor(t *testing.T) {
logger := zap.NewNop()
ledger := &mockLedger{height: 10}
mockPut := &mockPutBlockFunc{}
shutdownCallback := func() {}
t.Run("empty configuration", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Timeout: 0,
OIDBatchSize: 0,
DownloaderWorkersCount: 0,
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
t.Run("no addresses", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
t.Run("default values", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
require.NotNil(t, service)
require.Equal(t, service.IsActive(), false)
require.Equal(t, service.cfg.Timeout, defaultTimeout)
require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize)
require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount)
require.Equal(t, service.IsActive(), false)
})
t.Run("SDK client", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.NoError(t, err)
err = service.Start()
require.Error(t, err)
require.Contains(t, err.Error(), "create SDK client")
require.Equal(t, service.IsActive(), false)
})
t.Run("invalid wallet", func(t *testing.T) {
cfg := config.NeoFSBlockFetcher{
Addresses: []string{"http://localhost:8080"},
InternalService: config.InternalService{
Enabled: true,
UnlockWallet: config.Wallet{
Path: "invalid/path/to/wallet.json",
Password: "wrong-password",
},
},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
require.Error(t, err)
})
}

View file

@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -17,6 +18,8 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/user"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
@ -45,41 +48,48 @@ var (
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
// If Command is not provided, full object is requested.
func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) {
c, err := GetSDKClient(ctx, addr, 0)
if err != nil {
return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err)
}
return GetWithClient(ctx, c, priv, u, true)
}
// GetWithClient returns a neofs object from the provided url using the provided client.
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
// If Command is not provided, full object is requested. If wrapClientCloser is true,
// the client will be closed when the returned ReadCloser is closed.
func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) {
objectAddr, ps, err := parseNeoFSURL(u)
if err != nil {
return nil, err
}
c, err := client.New(client.PrmInit{})
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}
var (
res = clientCloseWrapper{c: c}
prmd client.PrmDial
res io.ReadCloser
s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
)
prmd.SetServerURI(addr)
prmd.SetContext(ctx)
err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter
if err != nil {
return res, err
}
var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
switch {
case len(ps) == 0 || ps[0] == "": // Get request
res.ReadCloser, err = getPayload(ctx, s, c, objectAddr)
case len(ps) == 0 || ps[0] == "":
res, err = getPayload(ctx, s, c, objectAddr)
case ps[0] == rangeCmd:
res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...)
res, err = getRange(ctx, s, c, objectAddr, ps[1:]...)
case ps[0] == headerCmd:
res.ReadCloser, err = getHeader(ctx, s, c, objectAddr)
res, err = getHeader(ctx, s, c, objectAddr)
case ps[0] == hashCmd:
res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...)
res, err = getHash(ctx, s, c, objectAddr, ps[1:]...)
default:
err = ErrInvalidCommand
return nil, ErrInvalidCommand
}
return res, err
if err != nil {
return nil, err
}
if wrapClientCloser {
return clientCloseWrapper{
c: c,
ReadCloser: res,
}, nil
}
return res, nil
}
type clientCloseWrapper struct {
@ -92,7 +102,12 @@ func (w clientCloseWrapper) Close() error {
if w.ReadCloser != nil {
res = w.ReadCloser.Close()
}
w.c.Close()
if w.c != nil {
closeErr := w.c.Close()
if closeErr != nil && res == nil {
res = closeErr
}
}
return res
}
@ -220,3 +235,58 @@ func parseRange(s string) (*object.Range, error) {
r.SetLength(length)
return r, nil
}
// ObjectSearch returns a list of object IDs from the provided container.
func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
var (
s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
objectIDs []oid.ID
containerID cid.ID
)
err := containerID.DecodeString(containerIDStr)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err)
}
reader, err := c.ObjectSearchInit(ctx, containerID, s, prm)
if err != nil {
return nil, fmt.Errorf("failed to initiate object search: %w", err)
}
defer reader.Close()
err = reader.Iterate(func(oid oid.ID) bool {
objectIDs = append(objectIDs, oid)
return false
})
if err != nil {
return nil, fmt.Errorf("error during object IDs iteration: %w", err)
}
return objectIDs, nil
}
// GetSDKClient returns a NeoFS SDK client configured with the specified address and context.
// If timeout is 0, the default timeout will be used.
func GetSDKClient(ctx context.Context, addr string, timeout time.Duration) (*client.Client, error) {
var prmDial client.PrmDial
if addr == "" {
return nil, errors.New("address is empty")
}
prmDial.SetServerURI(addr)
prmDial.SetContext(ctx)
if timeout != 0 {
prmDial.SetTimeout(timeout)
prmDial.SetStreamTimeout(timeout)
}
c, err := client.New(client.PrmInit{})
if err != nil {
return nil, fmt.Errorf("can't create SDK client: %w", err)
}
if err := c.Dial(prmDial); err != nil {
if status.Code(err) == codes.Unimplemented {
return c, nil
}
return nil, fmt.Errorf("can't init SDK client: %w", err)
}
return c, nil
}