blockfetcher: fix block enqueue logic

Previously, the `blockQueuer` routine, which enqueues blocks into
`bQueue`, could be blocked on enqueing newer blocks if older blocks
downloading is delayed by NeoFS.

The `blocksCh` channel, acting as a queue ordered by download speed,
conflicted with the BQueue requirement for strict sequential enqueuing
(expecting an exact range of blocks), resulting in a deadlock that
stalled the process.

Before with default config settings:
```
2024-11-27T17:12:19.348+0300	INFO	persisted to disk	{"blocks":
 0, "keys": 116, "headerHeight": 0, "blockHeight": 0, "took": "15
 .509083ms"}
2024-11-27T17:19:39.574+0300	INFO	persisted to disk	{"blocks":
 16, "keys": 11107, "headerHeight": 216768, "blockHeight": 216768,
 "took": "62.762041ms"}
```
Average block persistence speed: 492.40 block/s
Average blocks number for each persist log: 584.28

After:

```
2024-11-27T17:29:03.362+0300	INFO	persisted to disk	{"blocks":
 0, "keys": 116, "headerHeight": 0, "blockHeight": 0, "took": "19
 .485084ms"}
2024-11-27T17:34:58.527+0300	INFO	persisted to disk	{"blocks":
 16, "keys": 11109, "headerHeight": 216770, "blockHeight": 216769,
 "took": "52.43925ms"}
```
Average block persistence speed: 610.33 block/s
Average blocks number for each persist log: 752.61

Close #3699

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-27 18:23:38 +03:00
parent c1ce6904c4
commit 119ca27994
2 changed files with 4 additions and 33 deletions

View file

@ -58,9 +58,8 @@ parameter.
The number of downloading routines can be configured via
`DownloaderWorkersCount` parameter. It's up to the user to find the
balance between the downloading speed and blocks persist speed for every
node that uses NeoFS BlockFetcher. Downloaded blocks are placed into a
buffered channel of size `IDBatchSize` with further redirection to the
block queue.
node that uses NeoFS BlockFetcher. Downloaded blocks are placed to the
block queue directly.
3. **Block Insertion**:
Downloaded blocks are inserted into the blockchain using the same logic
as in the P2P synchronisation protocol. The block queue is used to order

View file

@ -89,8 +89,7 @@ type Service struct {
enqueueBlock func(*block.Block) error
account *wallet.Account
oidsCh chan oid.ID
blocksCh chan *block.Block
oidsCh chan oid.ID
// wg is a wait group for block downloaders.
wg sync.WaitGroup
@ -104,7 +103,6 @@ type Service struct {
exiterToOIDDownloader chan struct{}
exiterToShutdown chan struct{}
oidDownloaderToExiter chan struct{}
blockQueuerToExiter chan struct{}
shutdownCallback func()
}
@ -172,17 +170,12 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
exiterToOIDDownloader: make(chan struct{}),
exiterToShutdown: make(chan struct{}),
oidDownloaderToExiter: make(chan struct{}),
blockQueuerToExiter: make(chan struct{}),
// Use buffer of two batch sizes to load OIDs in advance:
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
// Use buffer of a single OIDs batch size to provide smooth downloading and
// avoid pauses during blockqueue insertion.
blocksCh: make(chan *block.Block, cfg.OIDBatchSize),
}, nil
}
@ -233,10 +226,6 @@ func (bfs *Service) Start() error {
bfs.wg.Add(1)
go bfs.blockDownloader()
}
// Start routine that puts blocks into bQueue.
go bfs.blockQueuer()
return nil
}
@ -288,24 +277,11 @@ func (bfs *Service) blockDownloader() {
bfs.stopService(true)
return
}
select {
case <-bfs.ctx.Done():
return
case bfs.blocksCh <- b:
}
}
}
// blockQueuer puts the block into the bqueue.
func (bfs *Service) blockQueuer() {
defer close(bfs.blockQueuerToExiter)
for b := range bfs.blocksCh {
select {
case <-bfs.ctx.Done():
return
default:
err := bfs.enqueueBlock(b)
err = bfs.enqueueBlock(b)
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err))
bfs.stopService(true)
@ -509,10 +485,6 @@ func (bfs *Service) exiter() {
close(bfs.oidsCh)
bfs.wg.Wait()
// Send signal to block putter to finish his work. Wait until it's finished.
close(bfs.blocksCh)
<-bfs.blockQueuerToExiter
// Everything is done, release resources, turn off the activity marker and let
// the server know about it.
_ = bfs.pool.Close()