Merge pull request #2123 from nspcc-dev/store-better

Store better
This commit is contained in:
Roman Khimov 2021-08-13 12:50:24 +03:00 committed by GitHub
commit adc660c3e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 150 additions and 265 deletions

View file

@ -165,6 +165,12 @@ type bcEvent struct {
appExecResults []*state.AppExecResult appExecResults []*state.AppExecResult
} }
// transferData is used for transfer caching during storeBlock.
type transferData struct {
Info state.NEP17TransferInfo
Log state.NEP17TransferLog
}
// NewBlockchain returns a new blockchain object the will use the // NewBlockchain returns a new blockchain object the will use the
// given Store as its underlying storage. For it to work correctly you need // given Store as its underlying storage. For it to work correctly you need
// to spawn a goroutine for its Run method after this initialization. // to spawn a goroutine for its Run method after this initialization.
@ -768,10 +774,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
}() }()
go func() { go func() {
var ( var (
kvcache = dao.NewCached(cache) kvcache = cache.GetWrapped()
writeBuf = io.NewBufBinWriter() writeBuf = io.NewBufBinWriter()
err error err error
appendBlock bool appendBlock bool
transCache = make(map[util.Uint160]transferData)
) )
for aer := range aerchan { for aer := range aerchan {
if aer.Container == block.Hash() && appendBlock { if aer.Container == block.Hash() && appendBlock {
@ -788,7 +795,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
} }
if aer.Execution.VMState == vm.HaltState { if aer.Execution.VMState == vm.HaltState {
for j := range aer.Execution.Events { for j := range aer.Execution.Events {
bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container)
} }
} }
writeBuf.Reset() writeBuf.Reset()
@ -797,6 +804,19 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
aerdone <- err aerdone <- err
return return
} }
for acc, trData := range transCache {
err = kvcache.PutNEP17TransferInfo(acc, &trData.Info)
if err != nil {
aerdone <- err
return
}
err = kvcache.PutNEP17TransferLog(acc, trData.Info.NextTransferBatch, &trData.Log)
if err != nil {
aerdone <- err
return
}
}
_, err = kvcache.Persist() _, err = kvcache.Persist()
if err != nil { if err != nil {
aerdone <- err aerdone <- err
@ -1016,7 +1036,8 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DA
}, nil }, nil
} }
func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.Cached, b *block.Block, h util.Uint256) { func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d dao.DAO,
transCache map[util.Uint160]transferData, b *block.Block, h util.Uint256) {
if note.Name != "Transfer" { if note.Name != "Transfer" {
return return
} }
@ -1053,7 +1074,7 @@ func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.C
} }
amount = bigint.FromBytes(bs) amount = bigint.FromBytes(bs)
} }
bc.processNEP17Transfer(d, h, b, note.ScriptHash, from, to, amount) bc.processNEP17Transfer(d, transCache, h, b, note.ScriptHash, from, to, amount)
} }
func parseUint160(addr []byte) util.Uint160 { func parseUint160(addr []byte) util.Uint160 {
@ -1063,7 +1084,8 @@ func parseUint160(addr []byte) util.Uint160 {
return util.Uint160{} return util.Uint160{}
} }
func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { func (bc *Blockchain) processNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData,
h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) {
toAddr := parseUint160(to) toAddr := parseUint160(to)
fromAddr := parseUint160(from) fromAddr := parseUint160(from)
var id int32 var id int32
@ -1086,46 +1108,51 @@ func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b
Tx: h, Tx: h,
} }
if !fromAddr.Equals(util.Uint160{}) { if !fromAddr.Equals(util.Uint160{}) {
balances, err := cache.GetNEP17TransferInfo(fromAddr) _ = transfer.Amount.Neg(amount) // We already have the Int.
if err != nil { if appendNEP17Transfer(cache, transCache, fromAddr, transfer) != nil {
return
}
balances.LastUpdated[id] = b.Index
transfer.Amount = *new(big.Int).Sub(&transfer.Amount, amount)
balances.NewBatch, err = cache.AppendNEP17Transfer(fromAddr,
balances.NextTransferBatch, balances.NewBatch, transfer)
if err != nil {
return
}
if balances.NewBatch {
balances.NextTransferBatch++
}
if err := cache.PutNEP17TransferInfo(fromAddr, balances); err != nil {
return return
} }
} }
if !toAddr.Equals(util.Uint160{}) { if !toAddr.Equals(util.Uint160{}) {
balances, err := cache.GetNEP17TransferInfo(toAddr) _ = transfer.Amount.Set(amount) // We already have the Int.
if err != nil { _ = appendNEP17Transfer(cache, transCache, toAddr, transfer) // Nothing useful we can do.
return
}
balances.LastUpdated[id] = b.Index
transfer.Amount = *amount
balances.NewBatch, err = cache.AppendNEP17Transfer(toAddr,
balances.NextTransferBatch, balances.NewBatch, transfer)
if err != nil {
return
}
if balances.NewBatch {
balances.NextTransferBatch++
}
if err := cache.PutNEP17TransferInfo(toAddr, balances); err != nil {
return
}
} }
} }
func appendNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer *state.NEP17Transfer) error {
transferData, ok := transCache[addr]
if !ok {
balances, err := cache.GetNEP17TransferInfo(addr)
if err != nil {
return err
}
if !balances.NewBatch {
trLog, err := cache.GetNEP17TransferLog(addr, balances.NextTransferBatch)
if err != nil {
return err
}
transferData.Log = *trLog
}
transferData.Info = *balances
}
err := transferData.Log.Append(transfer)
if err != nil {
return err
}
transferData.Info.LastUpdated[transfer.Asset] = transfer.Block
transferData.Info.NewBatch = transferData.Log.Size() >= state.NEP17TransferBatchSize
if transferData.Info.NewBatch {
err = cache.PutNEP17TransferLog(addr, transferData.Info.NextTransferBatch, &transferData.Log)
if err != nil {
return err
}
transferData.Info.NextTransferBatch++
transferData.Log = state.NEP17TransferLog{}
}
transCache[addr] = transferData
return nil
}
// ForEachNEP17Transfer executes f for each nep17 transfer in log. // ForEachNEP17Transfer executes f for each nep17 transfer in log.
func (bc *Blockchain) ForEachNEP17Transfer(acc util.Uint160, f func(*state.NEP17Transfer) (bool, error)) error { func (bc *Blockchain) ForEachNEP17Transfer(acc util.Uint160, f func(*state.NEP17Transfer) (bool, error)) error {
balances, err := bc.dao.GetNEP17TransferInfo(acc) balances, err := bc.dao.GetNEP17TransferInfo(acc)

View file

@ -1,126 +0,0 @@
package dao
import (
"errors"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// Cached is a data access object that mimics DAO, but has a write cache
// for accounts and NEP17 transfer data. These are the most frequently used
// objects in the storeBlock().
type Cached struct {
DAO
balances map[util.Uint160]*state.NEP17TransferInfo
transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog
}
// NewCached returns new Cached wrapping around given backing store.
func NewCached(d DAO) *Cached {
balances := make(map[util.Uint160]*state.NEP17TransferInfo)
transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog)
return &Cached{d.GetWrapped(), balances, transfers}
}
// GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc.
func (cd *Cached) GetNEP17TransferInfo(acc util.Uint160) (*state.NEP17TransferInfo, error) {
if bs := cd.balances[acc]; bs != nil {
return bs, nil
}
return cd.DAO.GetNEP17TransferInfo(acc)
}
// PutNEP17TransferInfo saves NEP17TransferInfo for the acc.
func (cd *Cached) PutNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo) error {
cd.balances[acc] = bs
return nil
}
// GetNEP17TransferLog retrieves NEP17TransferLog for the acc.
func (cd *Cached) GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error) {
ts := cd.transfers[acc]
if ts != nil && ts[index] != nil {
return ts[index], nil
}
return cd.DAO.GetNEP17TransferLog(acc, index)
}
// PutNEP17TransferLog saves NEP17TransferLog for the acc.
func (cd *Cached) PutNEP17TransferLog(acc util.Uint160, index uint32, bs *state.NEP17TransferLog) error {
ts := cd.transfers[acc]
if ts == nil {
ts = make(map[uint32]*state.NEP17TransferLog, 2)
cd.transfers[acc] = ts
}
ts[index] = bs
return nil
}
// AppendNEP17Transfer appends new transfer to a transfer event log.
func (cd *Cached) AppendNEP17Transfer(acc util.Uint160, index uint32, isNew bool, tr *state.NEP17Transfer) (bool, error) {
var lg *state.NEP17TransferLog
if isNew {
lg = new(state.NEP17TransferLog)
} else {
var err error
lg, err = cd.GetNEP17TransferLog(acc, index)
if err != nil {
return false, err
}
}
if err := lg.Append(tr); err != nil {
return false, err
}
return lg.Size() >= state.NEP17TransferBatchSize, cd.PutNEP17TransferLog(acc, index, lg)
}
// Persist flushes all the changes made into the (supposedly) persistent
// underlying store.
func (cd *Cached) Persist() (int, error) {
lowerCache, ok := cd.DAO.(*Cached)
// If the lower DAO is Cached, we only need to flush the MemCached DB.
// This actually breaks DAO interface incapsulation, but for our current
// usage scenario it should be good enough if cd doesn't modify object
// caches (accounts/transfer data) in any way.
if ok {
var simpleCache *Simple
for simpleCache == nil {
simpleCache, ok = lowerCache.DAO.(*Simple)
if !ok {
lowerCache, ok = cd.DAO.(*Cached)
if !ok {
return 0, errors.New("unsupported lower DAO")
}
}
}
return simpleCache.Persist()
}
buf := io.NewBufBinWriter()
for acc, bs := range cd.balances {
err := cd.DAO.putNEP17TransferInfo(acc, bs, buf)
if err != nil {
return 0, err
}
buf.Reset()
}
for acc, ts := range cd.transfers {
for ind, lg := range ts {
err := cd.DAO.PutNEP17TransferLog(acc, ind, lg)
if err != nil {
return 0, err
}
}
}
return cd.DAO.Persist()
}
// GetWrapped implements DAO interface.
func (cd *Cached) GetWrapped() DAO {
return &Cached{cd.DAO.GetWrapped(),
cd.balances,
cd.transfers,
}
}

View file

@ -1,57 +0,0 @@
package dao
import (
"testing"
"github.com/nspcc-dev/neo-go/internal/random"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCachedCachedDao(t *testing.T) {
store := storage.NewMemoryStore()
// Persistent DAO to check for backing storage.
pdao := NewSimple(store, false)
assert.NotEqual(t, store, pdao.Store)
// Cached DAO.
cdao := NewCached(pdao)
cdaoDao := cdao.DAO.(*Simple)
assert.NotEqual(t, store, cdaoDao.Store)
assert.NotEqual(t, pdao.Store, cdaoDao.Store)
// Cached cached DAO.
ccdao := NewCached(cdao)
ccdaoDao := ccdao.DAO.(*Cached)
intDao := ccdaoDao.DAO.(*Simple)
assert.NotEqual(t, store, intDao.Store)
assert.NotEqual(t, pdao.Store, intDao.Store)
assert.NotEqual(t, cdaoDao.Store, intDao.Store)
id := int32(random.Int(0, 1024))
key := []byte("qwerty")
si := state.StorageItem("poiuyt")
require.NoError(t, ccdao.PutStorageItem(id, key, si))
resi := ccdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
resi = cdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
cnt, err := ccdao.Persist()
assert.NoError(t, err)
assert.Equal(t, 1, cnt)
resi = cdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
cnt, err = cdao.Persist()
assert.NoError(t, err)
assert.Equal(t, 1, cnt)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
}

View file

@ -178,7 +178,7 @@ func TestCheckSig(t *testing.T) {
verifyFunc := ECDSASecp256r1CheckSig verifyFunc := ECDSASecp256r1CheckSig
d := dao.NewSimple(storage.NewMemoryStore(), false) d := dao.NewSimple(storage.NewMemoryStore(), false)
ic := &interop.Context{Network: uint32(netmode.UnitTestNet), DAO: dao.NewCached(d)} ic := &interop.Context{Network: uint32(netmode.UnitTestNet), DAO: d}
runCase := func(t *testing.T, isErr bool, result interface{}, args ...interface{}) { runCase := func(t *testing.T, isErr bool, result interface{}, args ...interface{}) {
ic.SpawnVM() ic.SpawnVM()
for i := range args { for i := range args {

View file

@ -45,7 +45,7 @@ func TestRuntimeGetRandomCompatibility(t *testing.T) {
b := getSharpTestGenesis(t) b := getSharpTestGenesis(t)
tx := getSharpTestTx(util.Uint160{}) tx := getSharpTestTx(util.Uint160{})
ic := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx) ic := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx)
ic.Network = 5195086 // Old mainnet magic used by C# tests. ic.Network = 5195086 // Old mainnet magic used by C# tests.
ic.VM = vm.New() ic.VM = vm.New()
@ -72,12 +72,12 @@ func TestRuntimeGetRandomDifferentTransactions(t *testing.T) {
b, _ := bc.GetBlock(bc.GetHeaderHash(0)) b, _ := bc.GetBlock(bc.GetHeaderHash(0))
tx1 := transaction.New([]byte{byte(opcode.PUSH1)}, 0) tx1 := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
ic1 := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx1) ic1 := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx1)
ic1.VM = vm.New() ic1.VM = vm.New()
ic1.VM.LoadScript(tx1.Script) ic1.VM.LoadScript(tx1.Script)
tx2 := transaction.New([]byte{byte(opcode.PUSH2)}, 0) tx2 := transaction.New([]byte{byte(opcode.PUSH2)}, 0)
ic2 := bc.newInteropContext(trigger.Application, dao.NewCached(bc.dao), b, tx2) ic2 := bc.newInteropContext(trigger.Application, bc.dao.GetWrapped(), b, tx2)
ic2.VM = vm.New() ic2.VM = vm.New()
ic2.VM.LoadScript(tx2.Script) ic2.VM.LoadScript(tx2.Script)

View file

@ -17,7 +17,7 @@ import (
func TestDeployGetUpdateDestroyContract(t *testing.T) { func TestDeployGetUpdateDestroyContract(t *testing.T) {
mgmt := newManagement() mgmt := newManagement()
d := dao.NewCached(dao.NewSimple(storage.NewMemoryStore(), false)) d := dao.NewSimple(storage.NewMemoryStore(), false)
err := mgmt.Initialize(&interop.Context{DAO: d}) err := mgmt.Initialize(&interop.Context{DAO: d})
require.NoError(t, err) require.NoError(t, err)
script := []byte{byte(opcode.RET)} script := []byte{byte(opcode.RET)}
@ -86,7 +86,7 @@ func TestManagement_Initialize(t *testing.T) {
func TestManagement_GetNEP17Contracts(t *testing.T) { func TestManagement_GetNEP17Contracts(t *testing.T) {
mgmt := newManagement() mgmt := newManagement()
d := dao.NewCached(dao.NewSimple(storage.NewMemoryStore(), false)) d := dao.NewSimple(storage.NewMemoryStore(), false)
err := mgmt.Initialize(&interop.Context{DAO: d}) err := mgmt.Initialize(&interop.Context{DAO: d})
require.NoError(t, err) require.NoError(t, err)

View file

@ -102,6 +102,25 @@ func (b *BadgerDBStore) PutBatch(batch Batch) error {
return batch.(*BadgerDBBatch).batch.Flush() return batch.(*BadgerDBBatch).batch.Flush()
} }
// PutChangeSet implements the Store interface.
func (b *BadgerDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
return b.db.Update(func(txn *badger.Txn) error {
for k, v := range puts {
err := txn.Set([]byte(k), v)
if err != nil {
return err
}
}
for k := range dels {
err := txn.Delete([]byte(k))
if err != nil {
return err
}
}
return nil
})
}
// Seek implements the Store interface. // Seek implements the Store interface.
func (b *BadgerDBStore) Seek(key []byte, f func(k, v []byte)) { func (b *BadgerDBStore) Seek(key []byte, f func(k, v []byte)) {
err := b.db.View(func(txn *badger.Txn) error { err := b.db.View(func(txn *badger.Txn) error {

View file

@ -84,15 +84,21 @@ func (s *BoltDBStore) Delete(key []byte) error {
// PutBatch implements the Store interface. // PutBatch implements the Store interface.
func (s *BoltDBStore) PutBatch(batch Batch) error { func (s *BoltDBStore) PutBatch(batch Batch) error {
memBatch := batch.(*MemoryBatch)
return s.PutChangeSet(memBatch.mem, memBatch.del)
}
// PutChangeSet implements the Store interface.
func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
return s.db.Batch(func(tx *bbolt.Tx) error { return s.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(Bucket) b := tx.Bucket(Bucket)
for k, v := range batch.(*MemoryBatch).mem { for k, v := range puts {
err := b.Put([]byte(k), v) err := b.Put([]byte(k), v)
if err != nil { if err != nil {
return err return err
} }
} }
for k := range batch.(*MemoryBatch).del { for k := range dels {
err := b.Delete([]byte(k)) err := b.Delete([]byte(k))
if err != nil { if err != nil {
return err return err

View file

@ -61,6 +61,29 @@ func (s *LevelDBStore) PutBatch(batch Batch) error {
return s.db.Write(lvldbBatch, nil) return s.db.Write(lvldbBatch, nil)
} }
// PutChangeSet implements the Store interface.
func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
tx, err := s.db.OpenTransaction()
if err != nil {
return err
}
for k := range puts {
err = tx.Put([]byte(k), puts[k], nil)
if err != nil {
tx.Discard()
return err
}
}
for k := range dels {
err = tx.Delete([]byte(k), nil)
if err != nil {
tx.Discard()
return err
}
}
return tx.Commit()
}
// Seek implements the Store interface. // Seek implements the Store interface.
func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) { func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) {
iter := s.db.NewIterator(util.BytesPrefix(key), nil) iter := s.db.NewIterator(util.BytesPrefix(key), nil)

View file

@ -121,32 +121,8 @@ func (s *MemCachedStore) Persist() (int, error) {
s.del = make(map[string]bool) s.del = make(map[string]bool)
s.mut.Unlock() s.mut.Unlock()
memStore, ok := tempstore.ps.(*MemoryStore) err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del)
if !ok {
memCachedStore, ok := tempstore.ps.(*MemCachedStore)
if ok {
memStore = &memCachedStore.MemoryStore
}
}
if memStore != nil {
memStore.mut.Lock()
for k := range tempstore.mem {
memStore.put(k, tempstore.mem[k])
}
for k := range tempstore.del {
memStore.drop(k)
}
memStore.mut.Unlock()
} else {
batch := tempstore.ps.Batch()
for k := range tempstore.mem {
batch.Put([]byte(k), tempstore.mem[k])
}
for k := range tempstore.del {
batch.Delete([]byte(k))
}
err = tempstore.ps.PutBatch(batch)
}
s.mut.Lock() s.mut.Lock()
if err == nil { if err == nil {
// tempstore.mem and tempstore.del are completely flushed now // tempstore.mem and tempstore.del are completely flushed now

View file

@ -200,6 +200,9 @@ func (b *BadStore) Put(k, v []byte) error {
return nil return nil
} }
func (b *BadStore) PutBatch(Batch) error { func (b *BadStore) PutBatch(Batch) error {
return nil
}
func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
b.onPutBatch() b.onPutBatch()
return ErrKeyNotFound return ErrKeyNotFound
} }

View file

@ -23,12 +23,12 @@ type MemoryBatch struct {
// Put implements the Batch interface. // Put implements the Batch interface.
func (b *MemoryBatch) Put(k, v []byte) { func (b *MemoryBatch) Put(k, v []byte) {
_ = b.MemoryStore.Put(k, v) b.MemoryStore.put(string(k), slice.Copy(v))
} }
// Delete implements Batch interface. // Delete implements Batch interface.
func (b *MemoryBatch) Delete(k []byte) { func (b *MemoryBatch) Delete(k []byte) {
_ = b.MemoryStore.Delete(k) b.MemoryStore.drop(string(k))
} }
// NewMemoryStore creates a new MemoryStore object. // NewMemoryStore creates a new MemoryStore object.
@ -85,14 +85,19 @@ func (s *MemoryStore) Delete(key []byte) error {
// PutBatch implements the Store interface. Never returns an error. // PutBatch implements the Store interface. Never returns an error.
func (s *MemoryStore) PutBatch(batch Batch) error { func (s *MemoryStore) PutBatch(batch Batch) error {
b := batch.(*MemoryBatch) b := batch.(*MemoryBatch)
return s.PutChangeSet(b.mem, b.del)
}
// PutChangeSet implements the Store interface. Never returns an error.
func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() for k := range puts {
for k := range b.del { s.put(k, puts[k])
}
for k := range dels {
s.drop(k) s.drop(k)
} }
for k, v := range b.mem { s.mut.Unlock()
s.put(k, v)
}
return nil return nil
} }

View file

@ -62,11 +62,17 @@ func (s *RedisStore) Put(k, v []byte) error {
// PutBatch implements the Store interface. // PutBatch implements the Store interface.
func (s *RedisStore) PutBatch(b Batch) error { func (s *RedisStore) PutBatch(b Batch) error {
memBatch := b.(*MemoryBatch)
return s.PutChangeSet(memBatch.mem, memBatch.del)
}
// PutChangeSet implements the Store interface.
func (s *RedisStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
pipe := s.client.Pipeline() pipe := s.client.Pipeline()
for k, v := range b.(*MemoryBatch).mem { for k, v := range puts {
pipe.Set(k, v, 0) pipe.Set(k, v, 0)
} }
for k := range b.(*MemoryBatch).del { for k := range dels {
pipe.Del(k) pipe.Del(k)
} }
_, err := pipe.Exec() _, err := pipe.Exec()

View file

@ -46,6 +46,8 @@ type (
Get([]byte) ([]byte, error) Get([]byte) ([]byte, error)
Put(k, v []byte) error Put(k, v []byte) error
PutBatch(Batch) error PutBatch(Batch) error
// PutChangeSet allows to push prepared changeset to the Store.
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
// Key and value slices should not be modified. // Key and value slices should not be modified.
Seek(k []byte, f func(k, v []byte)) Seek(k []byte, f func(k, v []byte))
@ -54,7 +56,8 @@ type (
// Batch represents an abstraction on top of batch operations. // Batch represents an abstraction on top of batch operations.
// Each Store implementation is responsible of casting a Batch // Each Store implementation is responsible of casting a Batch
// to its appropriate type. // to its appropriate type. Batches can only be used in a single
// thread.
Batch interface { Batch interface {
Delete(k []byte) Delete(k []byte)
Put(k, v []byte) Put(k, v []byte)