core: adjust in-memory processed set dynamically

Instead of tick-tocking with sync/async and having an unpredictable data
set we can just try to check for the real amount of keys that be processed
by the underlying DB. Can't be perfect, but still this adds some hard
limit to the amount of in-memory data. It's also adaptive, slower machines
will keep less and faster machines will keep more.

This gives almost perfect 4s cycles for mainnet BoltDB with no tail cutting,
it makes zero sense to process more blocks since we're clearly DB-bound:

2025-01-15T11:35:00.567+0300    INFO    persisted to disk       {"blocks": 1469, "keys": 40579, "headerHeight": 5438141, "blockHeight": 5438140, "velocity": 9912, "took": "4.378939648s"}
2025-01-15T11:35:04.699+0300    INFO    persisted to disk       {"blocks": 1060, "keys": 39976, "headerHeight": 5439201, "blockHeight": 5439200, "velocity": 9888, "took": "4.131985438s"}
2025-01-15T11:35:08.752+0300    INFO    persisted to disk       {"blocks": 1508, "keys": 39658, "headerHeight": 5440709, "blockHeight": 5440708, "velocity": 9877, "took": "4.052347569s"}
2025-01-15T11:35:12.807+0300    INFO    persisted to disk       {"blocks": 1645, "keys": 39565, "headerHeight": 5442354, "blockHeight": 5442353, "velocity": 9864, "took": "4.05547743s"}
2025-01-15T11:35:17.011+0300    INFO    persisted to disk       {"blocks": 1472, "keys": 39519, "headerHeight": 5443826, "blockHeight": 5443825, "velocity": 9817, "took": "4.203258142s"}
2025-01-15T11:35:21.089+0300    INFO    persisted to disk       {"blocks": 1345, "keys": 39529, "headerHeight": 5445171, "blockHeight": 5445170, "velocity": 9804, "took": "4.078297579s"}
2025-01-15T11:35:25.090+0300    INFO    persisted to disk       {"blocks": 1054, "keys": 39326, "headerHeight": 5446225, "blockHeight": 5446224, "velocity": 9806, "took": "4.000524899s"}
2025-01-15T11:35:30.372+0300    INFO    persisted to disk       {"blocks": 1239, "keys": 39349, "headerHeight": 5447464, "blockHeight": 5447463, "velocity": 9744, "took": "4.281444939s"}

2× can be considered, but this calculation isn't perfect for low number of
keys, so somewhat bigger tolerance is preferable for now. Overall it's not
degrading performance, my mainnet/bolt run was even 8% better with this.

Fixes #3249, we don't need any option this way.

Fixes #3783 as well, it no longer OOMs in that scenario. It however can OOM in
case of big GarbageCollectionPeriod (like 400K), but this can't be solved easily.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
This commit is contained in:
Roman Khimov 2025-01-15 11:31:08 +03:00
parent c07c74df41
commit 9513780c45
5 changed files with 66 additions and 11 deletions

View file

@ -70,6 +70,13 @@ const (
// either, the next cycle will still do the job (only transfers need this,
// MPT won't notice at all).
defaultBlockTimesCache = 8
// persistSamples is the number of persist velocity samples to use for
// storeBlock limit.
persistSamples = 10
// persistMinForSampling is the minimal number of keys to take persist
// time into account wrt persist velocity.
persistMinForSampling = 100
)
// stateChangeStage denotes the stage of state modification process.
@ -165,6 +172,14 @@ type Blockchain struct {
// Current persisted block count.
persistedHeight uint32
// keysPerPersist is the average number of persisted keys per persist
// time limit.
keysPerPersist uint32
// persistCond is signaled each time persist cycle ends, this wakes
// storeBlock if needed (when it has too many queued blocks)
persistCond *sync.Cond
// Index->Timestamp cache for garbage collector. Headers can be gone
// by the time it runs, so we use a tiny little cache to sync block
// removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC())
@ -338,6 +353,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
contracts: *native.NewContracts(cfg.ProtocolConfiguration),
}
bc.persistCond = sync.NewCond(&bc.lock)
bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size
bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store)
bc.contracts.Designate.StateRootService = bc.stateRoot
@ -1102,7 +1118,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
if _, err := bc.persist(true); err != nil {
if _, err := bc.persist(); err != nil {
bc.log.Warn("failed to persist", zap.Error(err))
}
if err := bc.dao.Store.Close(); err != nil {
@ -1112,7 +1128,6 @@ func (bc *Blockchain) Run() {
close(bc.runToExitCh)
}()
go bc.notificationDispatcher()
var nextSync bool
for {
select {
case <-bc.stopCh:
@ -1123,14 +1138,13 @@ func (bc *Blockchain) Run() {
if bc.config.Ledger.RemoveUntraceableBlocks {
oldPersisted = atomic.LoadUint32(&bc.persistedHeight)
}
dur, err := bc.persist(nextSync)
dur, err := bc.persist()
if err != nil {
bc.log.Error("failed to persist blockchain", zap.Error(err))
}
if bc.config.Ledger.RemoveUntraceableBlocks {
dur += bc.tryRunGC(oldPersisted)
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur
interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value.
persistTimer.Reset(interval)
@ -1797,6 +1811,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
}
bc.lock.Lock()
// Wait for a while if we're lagging behind the persistence routine,
// it's too easy to OOM otherwise. Keep in mind that this check can't
// be perfect, so some tolerance (accepting more) is OK to have.
var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 {
bc.persistCond.Wait()
}
_, err = aerCache.Persist()
if err != nil {
bc.lock.Unlock()
@ -2148,7 +2170,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
}
// persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
func (bc *Blockchain) persist() (time.Duration, error) {
var (
start = time.Now()
duration time.Duration
@ -2156,11 +2178,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
err error
)
if isSync {
persisted, err = bc.dao.PersistSync()
} else {
persisted, err = bc.dao.Persist()
}
persisted, err = bc.dao.Persist()
if err != nil {
return 0, err
}
@ -2177,6 +2195,20 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
return 0, err
}
duration = time.Since(start)
// Low number of keys is not representative and duration _can_
// be zero in tests on strange platforms like Windows.
if duration > 0 && persisted > persistMinForSampling {
var (
currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration))
persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
)
if persistVelocity != 0 {
currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes.
currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples
} // Otherwise it's the first sample and we take it as is.
atomic.StoreUint32(&bc.keysPerPersist, currentVelocity)
updateEstimatedPersistVelocityMetric(currentVelocity)
}
bc.log.Info("persisted to disk",
zap.Uint32("blocks", diff),
zap.Int("keys", persisted),
@ -2187,6 +2219,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
// update monitoring metrics.
updatePersistedHeightMetric(bHeight)
}
bc.persistCond.Signal()
return duration, nil
}

View file

@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist()
_, err = bc.persist(false)
_, err = bc.persist()
require.NoError(t, err)
key := make([]byte, 1+util.Uint256Size)

View file

@ -22,6 +22,14 @@ var (
Namespace: "neogo",
},
)
// estimatedPersistVelocity prometheus metric.
estimatedPersistVelocity = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Estimation of persist velocity per cycle (1s by default)",
Name: "estimated_persist_velocity",
Namespace: "neogo",
},
)
// headerHeight prometheus metric.
headerHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
@ -44,6 +52,7 @@ func init() {
prometheus.MustRegister(
blockHeight,
persistedHeight,
estimatedPersistVelocity,
headerHeight,
mempoolUnsortedTx,
)
@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) {
persistedHeight.Set(float64(pHeight))
}
func updateEstimatedPersistVelocityMetric(v uint32) {
estimatedPersistVelocity.Set(float64(v))
}
func updateHeaderHeightMetric(hHeight uint32) {
headerHeight.Set(float64(hHeight))
}

View file

@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) {
s.Put(key, value)
require.Equal(t, 1, s.Len())
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)
s.Delete(key)
require.Equal(t, 1, s.Len()) // deletion marker
_, err = s.Get(key)
assert.NotNil(t, err)
assert.Equal(t, err, ErrKeyNotFound)

View file

@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b
}
}
// Len returns the number of keys stored.
func (s *MemoryStore) Len() int {
s.mut.RLock()
defer s.mut.RUnlock()
return len(s.mem) + len(s.stor)
}
// Seek implements the Store interface.
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)