From 18682f240993633bdd05ac1e2b6cfcc6eeb1bf53 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 11 Aug 2021 18:53:18 +0300 Subject: [PATCH 1/5] storage: don't use locks for memory batches They're inherently single-threaded, so locking makes no sense for them. --- pkg/core/storage/memory_store.go | 4 ++-- pkg/core/storage/store.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 21ded2220..5fe843b19 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -23,12 +23,12 @@ type MemoryBatch struct { // Put implements the Batch interface. func (b *MemoryBatch) Put(k, v []byte) { - _ = b.MemoryStore.Put(k, v) + b.MemoryStore.put(string(k), slice.Copy(v)) } // Delete implements Batch interface. func (b *MemoryBatch) Delete(k []byte) { - _ = b.MemoryStore.Delete(k) + b.MemoryStore.drop(string(k)) } // NewMemoryStore creates a new MemoryStore object. diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 540fdcc39..a90466a02 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -52,7 +52,8 @@ type ( // Batch represents an abstraction on top of batch operations. // Each Store implementation is responsible of casting a Batch - // to its appropriate type. + // to its appropriate type. Batches can only be used in a single + // thread. Batch interface { Delete(k []byte) Put(k, v []byte) From 50ee1a1f9135c78506c027f17c3a2ccd33947eac Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 11 Aug 2021 21:02:50 +0300 Subject: [PATCH 2/5] *: don't use dao.Cached in tests There is no need to use it. --- pkg/core/interop/crypto/ecdsa_test.go | 2 +- pkg/core/interop_system_test.go | 6 +++--- pkg/core/native/management_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/core/interop/crypto/ecdsa_test.go b/pkg/core/interop/crypto/ecdsa_test.go index c5ca3ab32..4684e0cc9 100644 --- a/pkg/core/interop/crypto/ecdsa_test.go +++ b/pkg/core/interop/crypto/ecdsa_test.go @@ -174,7 +174,7 @@ func TestCheckSig(t *testing.T) { verifyFunc := ECDSASecp256r1CheckSig d := dao.NewSimple(storage.NewMemoryStore(), false) - ic := &interop.Context{Network: uint32(netmode.UnitTestNet), DAO: dao.NewCached(d)} + ic := &interop.Context{Network: uint32(netmode.UnitTestNet), DAO: d} runCase := func(t *testing.T, isErr bool, result interface{}, args ...interface{}) { ic.SpawnVM() for i := range args { diff --git a/pkg/core/interop_system_test.go b/pkg/core/interop_system_test.go index 6edaca809..62e0ab5f6 100644 --- a/pkg/core/interop_system_test.go +++ b/pkg/core/interop_system_test.go @@ -45,7 +45,7 @@ func TestRuntimeGetRandomCompatibility(t *testing.T) { b := getSharpTestGenesis(t) tx := getSharpTestTx(util.Uint160{}) - ic := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx) + ic := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx) ic.Network = 5195086 // Old mainnet magic used by C# tests. ic.VM = vm.New() @@ -72,12 +72,12 @@ func TestRuntimeGetRandomDifferentTransactions(t *testing.T) { b, _ := bc.GetBlock(bc.GetHeaderHash(0)) tx1 := transaction.New([]byte{byte(opcode.PUSH1)}, 0) - ic1 := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx1) + ic1 := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx1) ic1.VM = vm.New() ic1.VM.LoadScript(tx1.Script) tx2 := transaction.New([]byte{byte(opcode.PUSH2)}, 0) - ic2 := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx2) + ic2 := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx2) ic2.VM = vm.New() ic2.VM.LoadScript(tx2.Script) diff --git a/pkg/core/native/management_test.go b/pkg/core/native/management_test.go index ca71e4488..91f72bdf6 100644 --- a/pkg/core/native/management_test.go +++ b/pkg/core/native/management_test.go @@ -17,7 +17,7 @@ import ( func TestDeployGetUpdateDestroyContract(t *testing.T) { mgmt := newManagement() - d := dao.NewCached(dao.NewSimple(storage.NewMemoryStore(), false)) + d := dao.NewSimple(storage.NewMemoryStore(), false) err := mgmt.Initialize(&interop.Context{DAO: d}) require.NoError(t, err) script := []byte{byte(opcode.RET)} @@ -86,7 +86,7 @@ func TestManagement_Initialize(t *testing.T) { func TestManagement_GetNEP17Contracts(t *testing.T) { mgmt := newManagement() - d := dao.NewCached(dao.NewSimple(storage.NewMemoryStore(), false)) + d := dao.NewSimple(storage.NewMemoryStore(), false) err := mgmt.Initialize(&interop.Context{DAO: d}) require.NoError(t, err) From 3e60771175525a06621292dcd8422cda9a01f3e2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 11 Aug 2021 22:36:26 +0300 Subject: [PATCH 3/5] core: deduplicate and simplify processNEP17Transfer a bit Just refactoring, no functional changes. --- pkg/core/blockchain.go | 54 ++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 33 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f9e9f3072..f2b08c83a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1066,46 +1066,34 @@ func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b Tx: h, } if !fromAddr.Equals(util.Uint160{}) { - balances, err := cache.GetNEP17TransferInfo(fromAddr) - if err != nil { - return - } - balances.LastUpdated[id] = b.Index - transfer.Amount = *new(big.Int).Sub(&transfer.Amount, amount) - balances.NewBatch, err = cache.AppendNEP17Transfer(fromAddr, - balances.NextTransferBatch, balances.NewBatch, transfer) - if err != nil { - return - } - if balances.NewBatch { - balances.NextTransferBatch++ - } - if err := cache.PutNEP17TransferInfo(fromAddr, balances); err != nil { + _ = transfer.Amount.Neg(amount) // We already have the Int. + if appendNEP17Transfer(cache, fromAddr, transfer) != nil { return } } if !toAddr.Equals(util.Uint160{}) { - balances, err := cache.GetNEP17TransferInfo(toAddr) - if err != nil { - return - } - balances.LastUpdated[id] = b.Index - - transfer.Amount = *amount - balances.NewBatch, err = cache.AppendNEP17Transfer(toAddr, - balances.NextTransferBatch, balances.NewBatch, transfer) - if err != nil { - return - } - if balances.NewBatch { - balances.NextTransferBatch++ - } - if err := cache.PutNEP17TransferInfo(toAddr, balances); err != nil { - return - } + _ = transfer.Amount.Set(amount) // We already have the Int. + _ = appendNEP17Transfer(cache, toAddr, transfer) // Nothing useful we can do. } } +func appendNEP17Transfer(cache *dao.Cached, addr util.Uint160, transfer *state.NEP17Transfer) error { + balances, err := cache.GetNEP17TransferInfo(addr) + if err != nil { + return err + } + balances.LastUpdated[transfer.Asset] = transfer.Block + balances.NewBatch, err = cache.AppendNEP17Transfer(addr, + balances.NextTransferBatch, balances.NewBatch, transfer) + if err != nil { + return err + } + if balances.NewBatch { + balances.NextTransferBatch++ + } + return cache.PutNEP17TransferInfo(addr, balances) +} + // ForEachNEP17Transfer executes f for each nep17 transfer in log. func (bc *Blockchain) ForEachNEP17Transfer(acc util.Uint160, f func(*state.NEP17Transfer) (bool, error)) error { balances, err := bc.dao.GetNEP17TransferInfo(acc) From 47f0f4c45fabdf07c7043046fd2e6adc74d3512e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 11 Aug 2021 23:06:17 +0300 Subject: [PATCH 4/5] dao: completely drop Cached It was very useful in 2.0 days, but today it only serves one purpose that could easily (and more effectively!) be solved in another way. --- pkg/core/blockchain.go | 77 +++++++++++++++----- pkg/core/dao/cacheddao.go | 126 --------------------------------- pkg/core/dao/cacheddao_test.go | 57 --------------- 3 files changed, 58 insertions(+), 202 deletions(-) delete mode 100644 pkg/core/dao/cacheddao.go delete mode 100644 pkg/core/dao/cacheddao_test.go diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f2b08c83a..1f53a550a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -164,6 +164,12 @@ type bcEvent struct { appExecResults []*state.AppExecResult } +// transferData is used for transfer caching during storeBlock. +type transferData struct { + Info state.NEP17TransferInfo + Log state.NEP17TransferLog +} + // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. For it to work correctly you need // to spawn a goroutine for its Run method after this initialization. @@ -748,10 +754,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error }() go func() { var ( - kvcache = dao.NewCached(cache) + kvcache = cache.GetWrapped() writeBuf = io.NewBufBinWriter() err error appendBlock bool + transCache = make(map[util.Uint160]transferData) ) for aer := range aerchan { if aer.Container == block.Hash() && appendBlock { @@ -768,7 +775,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } if aer.Execution.VMState == vm.HaltState { for j := range aer.Execution.Events { - bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) + bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container) } } writeBuf.Reset() @@ -777,6 +784,19 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error aerdone <- err return } + for acc, trData := range transCache { + err = kvcache.PutNEP17TransferInfo(acc, &trData.Info) + if err != nil { + aerdone <- err + return + } + err = kvcache.PutNEP17TransferLog(acc, trData.Info.NextTransferBatch, &trData.Log) + if err != nil { + aerdone <- err + return + } + } + _, err = kvcache.Persist() if err != nil { aerdone <- err @@ -996,7 +1016,8 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DA }, nil } -func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.Cached, b *block.Block, h util.Uint256) { +func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d dao.DAO, + transCache map[util.Uint160]transferData, b *block.Block, h util.Uint256) { if note.Name != "Transfer" { return } @@ -1033,7 +1054,7 @@ func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.C } amount = bigint.FromBytes(bs) } - bc.processNEP17Transfer(d, h, b, note.ScriptHash, from, to, amount) + bc.processNEP17Transfer(d, transCache, h, b, note.ScriptHash, from, to, amount) } func parseUint160(addr []byte) util.Uint160 { @@ -1043,7 +1064,8 @@ func parseUint160(addr []byte) util.Uint160 { return util.Uint160{} } -func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { +func (bc *Blockchain) processNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, + h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { toAddr := parseUint160(to) fromAddr := parseUint160(from) var id int32 @@ -1067,31 +1089,48 @@ func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b } if !fromAddr.Equals(util.Uint160{}) { _ = transfer.Amount.Neg(amount) // We already have the Int. - if appendNEP17Transfer(cache, fromAddr, transfer) != nil { + if appendNEP17Transfer(cache, transCache, fromAddr, transfer) != nil { return } } if !toAddr.Equals(util.Uint160{}) { - _ = transfer.Amount.Set(amount) // We already have the Int. - _ = appendNEP17Transfer(cache, toAddr, transfer) // Nothing useful we can do. + _ = transfer.Amount.Set(amount) // We already have the Int. + _ = appendNEP17Transfer(cache, transCache, toAddr, transfer) // Nothing useful we can do. } } -func appendNEP17Transfer(cache *dao.Cached, addr util.Uint160, transfer *state.NEP17Transfer) error { - balances, err := cache.GetNEP17TransferInfo(addr) +func appendNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer *state.NEP17Transfer) error { + transferData, ok := transCache[addr] + if !ok { + balances, err := cache.GetNEP17TransferInfo(addr) + if err != nil { + return err + } + if !balances.NewBatch { + trLog, err := cache.GetNEP17TransferLog(addr, balances.NextTransferBatch) + if err != nil { + return err + } + transferData.Log = *trLog + } + transferData.Info = *balances + } + err := transferData.Log.Append(transfer) if err != nil { return err } - balances.LastUpdated[transfer.Asset] = transfer.Block - balances.NewBatch, err = cache.AppendNEP17Transfer(addr, - balances.NextTransferBatch, balances.NewBatch, transfer) - if err != nil { - return err + transferData.Info.LastUpdated[transfer.Asset] = transfer.Block + transferData.Info.NewBatch = transferData.Log.Size() >= state.NEP17TransferBatchSize + if transferData.Info.NewBatch { + err = cache.PutNEP17TransferLog(addr, transferData.Info.NextTransferBatch, &transferData.Log) + if err != nil { + return err + } + transferData.Info.NextTransferBatch++ + transferData.Log = state.NEP17TransferLog{} } - if balances.NewBatch { - balances.NextTransferBatch++ - } - return cache.PutNEP17TransferInfo(addr, balances) + transCache[addr] = transferData + return nil } // ForEachNEP17Transfer executes f for each nep17 transfer in log. diff --git a/pkg/core/dao/cacheddao.go b/pkg/core/dao/cacheddao.go deleted file mode 100644 index f147deb85..000000000 --- a/pkg/core/dao/cacheddao.go +++ /dev/null @@ -1,126 +0,0 @@ -package dao - -import ( - "errors" - - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/util" -) - -// Cached is a data access object that mimics DAO, but has a write cache -// for accounts and NEP17 transfer data. These are the most frequently used -// objects in the storeBlock(). -type Cached struct { - DAO - balances map[util.Uint160]*state.NEP17TransferInfo - transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog -} - -// NewCached returns new Cached wrapping around given backing store. -func NewCached(d DAO) *Cached { - balances := make(map[util.Uint160]*state.NEP17TransferInfo) - transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog) - return &Cached{d.GetWrapped(), balances, transfers} -} - -// GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc. -func (cd *Cached) GetNEP17TransferInfo(acc util.Uint160) (*state.NEP17TransferInfo, error) { - if bs := cd.balances[acc]; bs != nil { - return bs, nil - } - return cd.DAO.GetNEP17TransferInfo(acc) -} - -// PutNEP17TransferInfo saves NEP17TransferInfo for the acc. -func (cd *Cached) PutNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo) error { - cd.balances[acc] = bs - return nil -} - -// GetNEP17TransferLog retrieves NEP17TransferLog for the acc. -func (cd *Cached) GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error) { - ts := cd.transfers[acc] - if ts != nil && ts[index] != nil { - return ts[index], nil - } - return cd.DAO.GetNEP17TransferLog(acc, index) -} - -// PutNEP17TransferLog saves NEP17TransferLog for the acc. -func (cd *Cached) PutNEP17TransferLog(acc util.Uint160, index uint32, bs *state.NEP17TransferLog) error { - ts := cd.transfers[acc] - if ts == nil { - ts = make(map[uint32]*state.NEP17TransferLog, 2) - cd.transfers[acc] = ts - } - ts[index] = bs - return nil -} - -// AppendNEP17Transfer appends new transfer to a transfer event log. -func (cd *Cached) AppendNEP17Transfer(acc util.Uint160, index uint32, isNew bool, tr *state.NEP17Transfer) (bool, error) { - var lg *state.NEP17TransferLog - if isNew { - lg = new(state.NEP17TransferLog) - } else { - var err error - lg, err = cd.GetNEP17TransferLog(acc, index) - if err != nil { - return false, err - } - } - if err := lg.Append(tr); err != nil { - return false, err - } - return lg.Size() >= state.NEP17TransferBatchSize, cd.PutNEP17TransferLog(acc, index, lg) -} - -// Persist flushes all the changes made into the (supposedly) persistent -// underlying store. -func (cd *Cached) Persist() (int, error) { - lowerCache, ok := cd.DAO.(*Cached) - // If the lower DAO is Cached, we only need to flush the MemCached DB. - // This actually breaks DAO interface incapsulation, but for our current - // usage scenario it should be good enough if cd doesn't modify object - // caches (accounts/transfer data) in any way. - if ok { - var simpleCache *Simple - for simpleCache == nil { - simpleCache, ok = lowerCache.DAO.(*Simple) - if !ok { - lowerCache, ok = cd.DAO.(*Cached) - if !ok { - return 0, errors.New("unsupported lower DAO") - } - } - } - return simpleCache.Persist() - } - buf := io.NewBufBinWriter() - - for acc, bs := range cd.balances { - err := cd.DAO.putNEP17TransferInfo(acc, bs, buf) - if err != nil { - return 0, err - } - buf.Reset() - } - for acc, ts := range cd.transfers { - for ind, lg := range ts { - err := cd.DAO.PutNEP17TransferLog(acc, ind, lg) - if err != nil { - return 0, err - } - } - } - return cd.DAO.Persist() -} - -// GetWrapped implements DAO interface. -func (cd *Cached) GetWrapped() DAO { - return &Cached{cd.DAO.GetWrapped(), - cd.balances, - cd.transfers, - } -} diff --git a/pkg/core/dao/cacheddao_test.go b/pkg/core/dao/cacheddao_test.go deleted file mode 100644 index d1c99bd71..000000000 --- a/pkg/core/dao/cacheddao_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package dao - -import ( - "testing" - - "github.com/nspcc-dev/neo-go/internal/random" - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/core/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCachedCachedDao(t *testing.T) { - store := storage.NewMemoryStore() - // Persistent DAO to check for backing storage. - pdao := NewSimple(store, false) - assert.NotEqual(t, store, pdao.Store) - // Cached DAO. - cdao := NewCached(pdao) - cdaoDao := cdao.DAO.(*Simple) - assert.NotEqual(t, store, cdaoDao.Store) - assert.NotEqual(t, pdao.Store, cdaoDao.Store) - - // Cached cached DAO. - ccdao := NewCached(cdao) - ccdaoDao := ccdao.DAO.(*Cached) - intDao := ccdaoDao.DAO.(*Simple) - assert.NotEqual(t, store, intDao.Store) - assert.NotEqual(t, pdao.Store, intDao.Store) - assert.NotEqual(t, cdaoDao.Store, intDao.Store) - - id := int32(random.Int(0, 1024)) - key := []byte("qwerty") - si := state.StorageItem("poiuyt") - require.NoError(t, ccdao.PutStorageItem(id, key, si)) - resi := ccdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) - - resi = cdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - - cnt, err := ccdao.Persist() - assert.NoError(t, err) - assert.Equal(t, 1, cnt) - resi = cdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - - cnt, err = cdao.Persist() - assert.NoError(t, err) - assert.Equal(t, 1, cnt) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) -} From ae071d45428f088166fb2eceb198243940611ed9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 12 Aug 2021 13:35:09 +0300 Subject: [PATCH 5/5] storage: introduce PutChangeSet and use it for Persist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We're using batches in wrong way during persist, we already have all changes accumulated in two maps and then we move them to batch and then this is applied. For some DBs like BoltDB this batch is just another MemoryStore, so we essentially just shuffle the changeset from one map to another, for others like LevelDB batch is just a serialized set of KV pairs, it doesn't help much on subsequent PutBatch, we just duplicate the changeset again. So introduce PutChangeSet that allows to take two maps with sets and deletes directly. It also allows to simplify MemCachedStore logic. neo-bench for single node with 10 workers, LevelDB: Reference: RPS 30189.132 30556.448 30390.482 ≈ 30379 ± 0.61% TPS 29427.344 29418.687 29434.273 ≈ 29427 ± 0.03% CPU % 33.304 27.179 33.860 ≈ 31.45 ± 11.79% Mem MB 800.677 798.389 715.042 ≈ 771 ± 6.33% Patched: RPS 30264.326 30386.364 30166.231 ≈ 30272 ± 0.36% ⇅ TPS 29444.673 29407.440 29452.478 ≈ 29435 ± 0.08% ⇅ CPU % 34.012 32.597 33.467 ≈ 33.36 ± 2.14% ⇅ Mem MB 549.126 523.656 517.684 ≈ 530 ± 3.15% ↓ 31.26% BoltDB: Reference: RPS 31937.647 31551.684 31850.408 ≈ 31780 ± 0.64% TPS 31292.049 30368.368 31307.724 ≈ 30989 ± 1.74% CPU % 33.792 22.339 35.887 ≈ 30.67 ± 23.78% Mem MB 1271.687 1254.472 1215.639 ≈ 1247 ± 2.30% Patched: RPS 31746.818 30859.485 31689.761 ≈ 31432 ± 1.58% ⇅ TPS 31271.499 30340.726 30342.568 ≈ 30652 ± 1.75% ⇅ CPU % 34.611 34.414 31.553 ≈ 33.53 ± 5.11% ⇅ Mem MB 1262.960 1231.389 1335.569 ≈ 1277 ± 4.18% ⇅ --- pkg/core/storage/badgerdb_store.go | 19 ++++++++++++++++ pkg/core/storage/boltdb_store.go | 10 +++++++-- pkg/core/storage/leveldb_store.go | 23 +++++++++++++++++++ pkg/core/storage/memcached_store.go | 28 ++---------------------- pkg/core/storage/memcached_store_test.go | 3 +++ pkg/core/storage/memory_store.go | 15 ++++++++----- pkg/core/storage/redis_store.go | 10 +++++++-- pkg/core/storage/store.go | 2 ++ 8 files changed, 75 insertions(+), 35 deletions(-) diff --git a/pkg/core/storage/badgerdb_store.go b/pkg/core/storage/badgerdb_store.go index 6f5e268e1..6cae62849 100644 --- a/pkg/core/storage/badgerdb_store.go +++ b/pkg/core/storage/badgerdb_store.go @@ -102,6 +102,25 @@ func (b *BadgerDBStore) PutBatch(batch Batch) error { return batch.(*BadgerDBBatch).batch.Flush() } +// PutChangeSet implements the Store interface. +func (b *BadgerDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { + return b.db.Update(func(txn *badger.Txn) error { + for k, v := range puts { + err := txn.Set([]byte(k), v) + if err != nil { + return err + } + } + for k := range dels { + err := txn.Delete([]byte(k)) + if err != nil { + return err + } + } + return nil + }) +} + // Seek implements the Store interface. func (b *BadgerDBStore) Seek(key []byte, f func(k, v []byte)) { err := b.db.View(func(txn *badger.Txn) error { diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 23868c3f8..4112ed950 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -84,15 +84,21 @@ func (s *BoltDBStore) Delete(key []byte) error { // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { + memBatch := batch.(*MemoryBatch) + return s.PutChangeSet(memBatch.mem, memBatch.del) +} + +// PutChangeSet implements the Store interface. +func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { return s.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) - for k, v := range batch.(*MemoryBatch).mem { + for k, v := range puts { err := b.Put([]byte(k), v) if err != nil { return err } } - for k := range batch.(*MemoryBatch).del { + for k := range dels { err := b.Delete([]byte(k)) if err != nil { return err diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index e3941f0b8..e348c3c2e 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -61,6 +61,29 @@ func (s *LevelDBStore) PutBatch(batch Batch) error { return s.db.Write(lvldbBatch, nil) } +// PutChangeSet implements the Store interface. +func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { + tx, err := s.db.OpenTransaction() + if err != nil { + return err + } + for k := range puts { + err = tx.Put([]byte(k), puts[k], nil) + if err != nil { + tx.Discard() + return err + } + } + for k := range dels { + err = tx.Delete([]byte(k), nil) + if err != nil { + tx.Discard() + return err + } + } + return tx.Commit() +} + // Seek implements the Store interface. func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) { iter := s.db.NewIterator(util.BytesPrefix(key), nil) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 4be37f44c..b21d3edaa 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -121,32 +121,8 @@ func (s *MemCachedStore) Persist() (int, error) { s.del = make(map[string]bool) s.mut.Unlock() - memStore, ok := tempstore.ps.(*MemoryStore) - if !ok { - memCachedStore, ok := tempstore.ps.(*MemCachedStore) - if ok { - memStore = &memCachedStore.MemoryStore - } - } - if memStore != nil { - memStore.mut.Lock() - for k := range tempstore.mem { - memStore.put(k, tempstore.mem[k]) - } - for k := range tempstore.del { - memStore.drop(k) - } - memStore.mut.Unlock() - } else { - batch := tempstore.ps.Batch() - for k := range tempstore.mem { - batch.Put([]byte(k), tempstore.mem[k]) - } - for k := range tempstore.del { - batch.Delete([]byte(k)) - } - err = tempstore.ps.PutBatch(batch) - } + err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) + s.mut.Lock() if err == nil { // tempstore.mem and tempstore.del are completely flushed now diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 7dfee3027..fe811f61d 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -200,6 +200,9 @@ func (b *BadStore) Put(k, v []byte) error { return nil } func (b *BadStore) PutBatch(Batch) error { + return nil +} +func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error { b.onPutBatch() return ErrKeyNotFound } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 5fe843b19..45c4ae9fd 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -85,14 +85,19 @@ func (s *MemoryStore) Delete(key []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) + return s.PutChangeSet(b.mem, b.del) +} + +// PutChangeSet implements the Store interface. Never returns an error. +func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { s.mut.Lock() - defer s.mut.Unlock() - for k := range b.del { + for k := range puts { + s.put(k, puts[k]) + } + for k := range dels { s.drop(k) } - for k, v := range b.mem { - s.put(k, v) - } + s.mut.Unlock() return nil } diff --git a/pkg/core/storage/redis_store.go b/pkg/core/storage/redis_store.go index ae24b8c0c..b1631068b 100644 --- a/pkg/core/storage/redis_store.go +++ b/pkg/core/storage/redis_store.go @@ -62,11 +62,17 @@ func (s *RedisStore) Put(k, v []byte) error { // PutBatch implements the Store interface. func (s *RedisStore) PutBatch(b Batch) error { + memBatch := b.(*MemoryBatch) + return s.PutChangeSet(memBatch.mem, memBatch.del) +} + +// PutChangeSet implements the Store interface. +func (s *RedisStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { pipe := s.client.Pipeline() - for k, v := range b.(*MemoryBatch).mem { + for k, v := range puts { pipe.Set(k, v, 0) } - for k := range b.(*MemoryBatch).del { + for k := range dels { pipe.Del(k) } _, err := pipe.Exec() diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index a90466a02..d5223c960 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -44,6 +44,8 @@ type ( Get([]byte) ([]byte, error) Put(k, v []byte) error PutBatch(Batch) error + // PutChangeSet allows to push prepared changeset to the Store. + PutChangeSet(puts map[string][]byte, dels map[string]bool) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Key and value slices should not be modified. Seek(k []byte, f func(k, v []byte))