From 9513780c450632a0703e0216e1948dc88cc9cfb3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 15 Jan 2025 11:31:08 +0300 Subject: [PATCH] core: adjust in-memory processed set dynamically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of tick-tocking with sync/async and having an unpredictable data set we can just try to check for the real amount of keys that be processed by the underlying DB. Can't be perfect, but still this adds some hard limit to the amount of in-memory data. It's also adaptive, slower machines will keep less and faster machines will keep more. This gives almost perfect 4s cycles for mainnet BoltDB with no tail cutting, it makes zero sense to process more blocks since we're clearly DB-bound: 2025-01-15T11:35:00.567+0300 INFO persisted to disk {"blocks": 1469, "keys": 40579, "headerHeight": 5438141, "blockHeight": 5438140, "velocity": 9912, "took": "4.378939648s"} 2025-01-15T11:35:04.699+0300 INFO persisted to disk {"blocks": 1060, "keys": 39976, "headerHeight": 5439201, "blockHeight": 5439200, "velocity": 9888, "took": "4.131985438s"} 2025-01-15T11:35:08.752+0300 INFO persisted to disk {"blocks": 1508, "keys": 39658, "headerHeight": 5440709, "blockHeight": 5440708, "velocity": 9877, "took": "4.052347569s"} 2025-01-15T11:35:12.807+0300 INFO persisted to disk {"blocks": 1645, "keys": 39565, "headerHeight": 5442354, "blockHeight": 5442353, "velocity": 9864, "took": "4.05547743s"} 2025-01-15T11:35:17.011+0300 INFO persisted to disk {"blocks": 1472, "keys": 39519, "headerHeight": 5443826, "blockHeight": 5443825, "velocity": 9817, "took": "4.203258142s"} 2025-01-15T11:35:21.089+0300 INFO persisted to disk {"blocks": 1345, "keys": 39529, "headerHeight": 5445171, "blockHeight": 5445170, "velocity": 9804, "took": "4.078297579s"} 2025-01-15T11:35:25.090+0300 INFO persisted to disk {"blocks": 1054, "keys": 39326, "headerHeight": 5446225, "blockHeight": 5446224, "velocity": 9806, "took": "4.000524899s"} 2025-01-15T11:35:30.372+0300 INFO persisted to disk {"blocks": 1239, "keys": 39349, "headerHeight": 5447464, "blockHeight": 5447463, "velocity": 9744, "took": "4.281444939s"} 2× can be considered, but this calculation isn't perfect for low number of keys, so somewhat bigger tolerance is preferable for now. Overall it's not degrading performance, my mainnet/bolt run was even 8% better with this. Fixes #3249, we don't need any option this way. Fixes #3783 as well, it no longer OOMs in that scenario. It however can OOM in case of big GarbageCollectionPeriod (like 400K), but this can't be solved easily. Signed-off-by: Roman Khimov --- pkg/core/blockchain.go | 53 +++++++++++++++++++----- pkg/core/blockchain_core_test.go | 2 +- pkg/core/prometheus.go | 13 ++++++ pkg/core/storage/memcached_store_test.go | 2 + pkg/core/storage/memory_store.go | 7 ++++ 5 files changed, 66 insertions(+), 11 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index d14fa9852..85291b9ee 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -70,6 +70,13 @@ const ( // either, the next cycle will still do the job (only transfers need this, // MPT won't notice at all). defaultBlockTimesCache = 8 + + // persistSamples is the number of persist velocity samples to use for + // storeBlock limit. + persistSamples = 10 + // persistMinForSampling is the minimal number of keys to take persist + // time into account wrt persist velocity. + persistMinForSampling = 100 ) // stateChangeStage denotes the stage of state modification process. @@ -165,6 +172,14 @@ type Blockchain struct { // Current persisted block count. persistedHeight uint32 + // keysPerPersist is the average number of persisted keys per persist + // time limit. + keysPerPersist uint32 + + // persistCond is signaled each time persist cycle ends, this wakes + // storeBlock if needed (when it has too many queued blocks) + persistCond *sync.Cond + // Index->Timestamp cache for garbage collector. Headers can be gone // by the time it runs, so we use a tiny little cache to sync block // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) @@ -338,6 +353,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl contracts: *native.NewContracts(cfg.ProtocolConfiguration), } + bc.persistCond = sync.NewCond(&bc.lock) bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot @@ -1102,7 +1118,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if _, err := bc.persist(true); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -1112,7 +1128,6 @@ func (bc *Blockchain) Run() { close(bc.runToExitCh) }() go bc.notificationDispatcher() - var nextSync bool for { select { case <-bc.stopCh: @@ -1123,14 +1138,13 @@ func (bc *Blockchain) Run() { if bc.config.Ledger.RemoveUntraceableBlocks { oldPersisted = atomic.LoadUint32(&bc.persistedHeight) } - dur, err := bc.persist(nextSync) + dur, err := bc.persist() if err != nil { bc.log.Error("failed to persist blockchain", zap.Error(err)) } if bc.config.Ledger.RemoveUntraceableBlocks { dur += bc.tryRunGC(oldPersisted) } - nextSync = dur > persistInterval*2 interval := persistInterval - dur interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value. persistTimer.Reset(interval) @@ -1797,6 +1811,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } bc.lock.Lock() + // Wait for a while if we're lagging behind the persistence routine, + // it's too easy to OOM otherwise. Keep in mind that this check can't + // be perfect, so some tolerance (accepting more) is OK to have. + var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 { + bc.persistCond.Wait() + } + _, err = aerCache.Persist() if err != nil { bc.lock.Unlock() @@ -2148,7 +2170,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() duration time.Duration @@ -2156,11 +2178,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { err error ) - if isSync { - persisted, err = bc.dao.PersistSync() - } else { - persisted, err = bc.dao.Persist() - } + persisted, err = bc.dao.Persist() if err != nil { return 0, err } @@ -2177,6 +2195,20 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { return 0, err } duration = time.Since(start) + // Low number of keys is not representative and duration _can_ + // be zero in tests on strange platforms like Windows. + if duration > 0 && persisted > persistMinForSampling { + var ( + currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration)) + persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + ) + if persistVelocity != 0 { + currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes. + currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples + } // Otherwise it's the first sample and we take it as is. + atomic.StoreUint32(&bc.keysPerPersist, currentVelocity) + updateEstimatedPersistVelocityMetric(currentVelocity) + } bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), @@ -2187,6 +2219,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { // update monitoring metrics. updatePersistedHeightMetric(bHeight) } + bc.persistCond.Signal() return duration, nil } diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 7f71a5790..f99fa5ad2 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - _, err = bc.persist(false) + _, err = bc.persist() require.NoError(t, err) key := make([]byte, 1+util.Uint256Size) diff --git a/pkg/core/prometheus.go b/pkg/core/prometheus.go index 8e429d044..b649cd26a 100644 --- a/pkg/core/prometheus.go +++ b/pkg/core/prometheus.go @@ -22,6 +22,14 @@ var ( Namespace: "neogo", }, ) + // estimatedPersistVelocity prometheus metric. + estimatedPersistVelocity = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Estimation of persist velocity per cycle (1s by default)", + Name: "estimated_persist_velocity", + Namespace: "neogo", + }, + ) // headerHeight prometheus metric. headerHeight = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -44,6 +52,7 @@ func init() { prometheus.MustRegister( blockHeight, persistedHeight, + estimatedPersistVelocity, headerHeight, mempoolUnsortedTx, ) @@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) { persistedHeight.Set(float64(pHeight)) } +func updateEstimatedPersistVelocityMetric(v uint32) { + estimatedPersistVelocity.Set(float64(v)) +} + func updateHeaderHeightMetric(hHeight uint32) { headerHeight.Set(float64(hHeight)) } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 586e5e06e..cc041b421 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) { s.Put(key, value) + require.Equal(t, 1, s.Len()) result, err := s.Get(key) assert.Nil(t, err) require.Equal(t, value, result) s.Delete(key) + require.Equal(t, 1, s.Len()) // deletion marker _, err = s.Get(key) assert.NotNil(t, err) assert.Equal(t, err, ErrKeyNotFound) diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index e4e8c20d6..dd962cb87 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b } } +// Len returns the number of keys stored. +func (s *MemoryStore) Len() int { + s.mut.RLock() + defer s.mut.RUnlock() + return len(s.mem) + len(s.stor) +} + // Seek implements the Store interface. func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)