diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index ac6cd3a16..41a8f8291 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -84,23 +84,25 @@ func (s *BoltDBStore) Delete(key []byte) error { // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { memBatch := batch.(*MemoryBatch) - return s.PutChangeSet(memBatch.mem) + return s.PutChangeSet(memBatch.mem, memBatch.stor) } // PutChangeSet implements the Store interface. -func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { +func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { var err error return s.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) - for k, v := range puts { - if v != nil { - err = b.Put([]byte(k), v) - } else { - err = b.Delete([]byte(k)) - } - if err != nil { - return err + for _, m := range []map[string][]byte{puts, stores} { + for k, v := range m { + if v != nil { + err = b.Put([]byte(k), v) + } else { + err = b.Delete([]byte(k)) + } + if err != nil { + return err + } } } return nil diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 0c843aba3..578a0157c 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -62,20 +62,22 @@ func (s *LevelDBStore) PutBatch(batch Batch) error { } // PutChangeSet implements the Store interface. -func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { +func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { tx, err := s.db.OpenTransaction() if err != nil { return err } - for k := range puts { - if puts[k] != nil { - err = tx.Put([]byte(k), puts[k], nil) - } else { - err = tx.Delete([]byte(k), nil) - } - if err != nil { - tx.Discard() - return err + for _, m := range []map[string][]byte{puts, stores} { + for k := range m { + if m[k] != nil { + err = tx.Put([]byte(k), m[k], nil) + } else { + err = tx.Delete([]byte(k), nil) + } + if err != nil { + tx.Discard() + return err + } } } return tx.Commit() diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 019f25133..3429ecf0f 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -55,7 +55,8 @@ func NewMemCachedStore(lower Store) *MemCachedStore { func (s *MemCachedStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - if val, ok := s.mem[string(key)]; ok { + m := s.chooseMap(key) + if val, ok := m[string(key)]; ok { if val == nil { return nil, ErrKeyNotFound } @@ -71,15 +72,17 @@ func (s *MemCachedStore) GetBatch() *MemBatch { var b MemBatch - b.Put = make([]KeyValueExists, 0, len(s.mem)) + b.Put = make([]KeyValueExists, 0, len(s.mem)+len(s.stor)) b.Deleted = make([]KeyValueExists, 0) - for k, v := range s.mem { - key := []byte(k) - _, err := s.ps.Get(key) - if v == nil { - b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) - } else { - b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + for _, m := range []map[string][]byte{s.mem, s.stor} { + for k, v := range m { + key := []byte(k) + _, err := s.ps.Get(key) + if v == nil { + b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) + } else { + b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + } } } return &b @@ -131,7 +134,8 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool } } s.mut.RLock() - for k, v := range s.MemoryStore.mem { + m := s.MemoryStore.chooseMap(rng.Prefix) + for k, v := range m { if isKeyOK(k) { memRes = append(memRes, KeyValueExists{ KeyValue: KeyValue{ @@ -259,7 +263,7 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { defer s.plock.Unlock() s.mut.Lock() - keys = len(s.mem) + keys = len(s.mem) + len(s.stor) if keys == 0 { s.mut.Unlock() return 0, nil @@ -269,14 +273,15 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // starts using fresh new maps. This tempstore is only known here and // nothing ever changes it, therefore accesses to it (reads) can go // unprotected while writes are handled by s proper. - var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps} + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, stor: s.stor}, ps: s.ps} s.ps = tempstore s.mem = make(map[string][]byte, len(s.mem)) + s.stor = make(map[string][]byte, len(s.stor)) if !isSync { s.mut.Unlock() } - err = tempstore.ps.PutChangeSet(tempstore.mem) + err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.stor) if !isSync { s.mut.Lock() @@ -290,10 +295,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // We're toast. We'll try to still keep proper state, but OOM // killer will get to us eventually. for k := range s.mem { - tempstore.put(k, s.mem[k]) + put(tempstore.mem, k, s.mem[k]) + } + for k := range s.stor { + put(tempstore.stor, k, s.stor[k]) } s.ps = tempstore.ps s.mem = tempstore.mem + s.stor = tempstore.stor } s.mut.Unlock() return keys, err diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index a284a7ad5..d7d524435 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -287,7 +287,7 @@ func (b *BadStore) Put(k, v []byte) error { func (b *BadStore) PutBatch(Batch) error { return nil } -func (b *BadStore) PutChangeSet(_ map[string][]byte) error { +func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string][]byte) error { b.onPutBatch() return ErrKeyNotFound } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 371e4bf65..fa31a1b0f 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -12,8 +12,9 @@ import ( // MemoryStore is an in-memory implementation of a Store, mainly // used for testing. Do not use MemoryStore in production. type MemoryStore struct { - mut sync.RWMutex - mem map[string][]byte + mut sync.RWMutex + mem map[string][]byte + stor map[string][]byte } // MemoryBatch is an in-memory batch compatible with MemoryStore. @@ -23,18 +24,19 @@ type MemoryBatch struct { // Put implements the Batch interface. func (b *MemoryBatch) Put(k, v []byte) { - b.MemoryStore.put(string(k), slice.Copy(v)) + put(b.MemoryStore.chooseMap(k), string(k), slice.Copy(v)) } // Delete implements Batch interface. func (b *MemoryBatch) Delete(k []byte) { - b.MemoryStore.drop(string(k)) + drop(b.MemoryStore.chooseMap(k), string(k)) } // NewMemoryStore creates a new MemoryStore object. func NewMemoryStore() *MemoryStore { return &MemoryStore{ - mem: make(map[string][]byte), + mem: make(map[string][]byte), + stor: make(map[string][]byte), } } @@ -42,16 +44,26 @@ func NewMemoryStore() *MemoryStore { func (s *MemoryStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - if val, ok := s.mem[string(key)]; ok && val != nil { + m := s.chooseMap(key) + if val, ok := m[string(key)]; ok && val != nil { return val, nil } return nil, ErrKeyNotFound } +func (s *MemoryStore) chooseMap(key []byte) map[string][]byte { + switch KeyPrefix(key[0]) { + case STStorage, STTempStorage: + return s.stor + default: + return s.mem + } +} + // put puts a key-value pair into the store, it's supposed to be called // with mutex locked. -func (s *MemoryStore) put(key string, value []byte) { - s.mem[key] = value +func put(m map[string][]byte, key string, value []byte) { + m[key] = value } // Put implements the Store interface. Never returns an error. @@ -59,22 +71,22 @@ func (s *MemoryStore) Put(key, value []byte) error { newKey := string(key) vcopy := slice.Copy(value) s.mut.Lock() - s.put(newKey, vcopy) + put(s.chooseMap(key), newKey, vcopy) s.mut.Unlock() return nil } // drop deletes a key-value pair from the store, it's supposed to be called // with mutex locked. -func (s *MemoryStore) drop(key string) { - s.mem[key] = nil +func drop(m map[string][]byte, key string) { + m[key] = nil } // Delete implements Store interface. Never returns an error. func (s *MemoryStore) Delete(key []byte) error { newKey := string(key) s.mut.Lock() - s.drop(newKey) + drop(s.chooseMap(key), newKey) s.mut.Unlock() return nil } @@ -82,14 +94,17 @@ func (s *MemoryStore) Delete(key []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) - return s.PutChangeSet(b.mem) + return s.PutChangeSet(b.mem, b.stor) } // PutChangeSet implements the Store interface. Never returns an error. -func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error { +func (s *MemoryStore) PutChangeSet(puts map[string][]byte, stores map[string][]byte) error { s.mut.Lock() for k := range puts { - s.put(k, puts[k]) + put(s.mem, k, puts[k]) + } + for k := range stores { + put(s.stor, k, stores[k]) } s.mut.Unlock() return nil @@ -109,7 +124,7 @@ func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { // sensitive to the order of KV pairs. s.seek(rng, func(k, v []byte) bool { if !keep(k, v) { - s.drop(string(k)) + drop(s.chooseMap(k), string(k)) } return true }) @@ -122,7 +137,8 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { s.mut.RLock() defer s.mut.RUnlock() sk := string(key) - for k, v := range s.mem { + m := s.chooseMap(key) + for k, v := range m { if strings.HasPrefix(k, sk) { f([]byte(k), v) } @@ -152,7 +168,8 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { return res != 0 && rng.Backwards == (res > 0) } - for k, v := range s.mem { + m := s.chooseMap(rng.Prefix) + for k, v := range m { if v != nil && isKeyOK(k) { memList = append(memList, KeyValue{ Key: []byte(k), @@ -185,6 +202,7 @@ func newMemoryBatch() *MemoryBatch { func (s *MemoryStore) Close() error { s.mut.Lock() s.mem = nil + s.stor = nil s.mut.Unlock() return nil } diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 24372bc4c..de76669f0 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -61,8 +61,7 @@ type Operation struct { // SeekRange represents options for Store.Seek operation. type SeekRange struct { // Prefix denotes the Seek's lookup key. - // Empty Prefix means seeking through all keys in the DB starting from - // the Start if specified. + // Empty Prefix is not supported. Prefix []byte // Start denotes value appended to the Prefix to start Seek from. // Seeking starting from some key includes this key to the result; @@ -92,7 +91,7 @@ type ( Put(k, v []byte) error PutBatch(Batch) error // PutChangeSet allows to push prepared changeset to the Store. - PutChangeSet(puts map[string][]byte) error + PutChangeSet(puts map[string][]byte, stor map[string][]byte) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek continues iteration until false is returned from f. // Key and value slices should not be modified. diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index 6b8d7c102..fbb5a5f35 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -228,108 +228,6 @@ func testStoreSeek(t *testing.T, s Store) { }) }) }) - - t.Run("empty prefix, non-empty start", func(t *testing.T) { - t.Run("forwards", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("21") - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[4], // key = "22" - kvs[5], // key = "30" - kvs[6], // key = "31" - } - check(t, goodprefix, start, goodkvs, false, nil) - }) - t.Run("no matching items", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("32") // start is more than all keys. - check(t, goodprefix, start, []KeyValue{}, false, nil) - }) - t.Run("early stop", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("21") - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[4], // key = "22" - kvs[5], // key = "30" - } - check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool { - return string(k) < "30" - }) - }) - }) - t.Run("backwards", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("21") - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[2], // key = "20" - kvs[1], // key = "11" - kvs[0], // key = "10" - } - check(t, goodprefix, start, goodkvs, true, nil) - }) - t.Run("no matching items", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("0") // start is less than all keys. - check(t, goodprefix, start, []KeyValue{}, true, nil) - }) - t.Run("early stop", func(t *testing.T) { - goodprefix := []byte{} - start := []byte("21") - goodkvs := []KeyValue{ - kvs[3], // key = "21" - kvs[2], // key = "20" - kvs[1], // key = "11" - } - check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool { - return string(k) > "11" - }) - }) - }) - }) - - t.Run("empty prefix, empty start", func(t *testing.T) { - goodprefix := []byte{} - start := []byte{} - goodkvs := make([]KeyValue, len(kvs)) - copy(goodkvs, kvs) - t.Run("forwards", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - check(t, goodprefix, start, goodkvs, false, nil) - }) - t.Run("early stop", func(t *testing.T) { - goodkvs := []KeyValue{ - kvs[0], // key = "10" - kvs[1], // key = "11" - kvs[2], // key = "20" - kvs[3], // key = "21" - } - check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool { - return string(k) < "21" - }) - }) - }) - t.Run("backwards", func(t *testing.T) { - t.Run("good", func(t *testing.T) { - check(t, goodprefix, start, goodkvs, true, nil) - }) - t.Run("early stop", func(t *testing.T) { - goodkvs := []KeyValue{ - kvs[6], // key = "31" - kvs[5], // key = "30" - kvs[4], // key = "22" - kvs[3], // key = "21" - } - check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool { - return string(k) > "21" - }) - }) - }) - }) } func testStoreDeleteNonExistent(t *testing.T, s Store) {