neoneo-go/pkg/core/storage/redis_store.go

96 lines
2.1 KiB
Go
Raw Normal View History

package storage
import (
"fmt"
"github.com/go-redis/redis"
)
// RedisDBOptions configuration for RedisDB.
type RedisDBOptions struct {
Addr string `yaml:"Addr"`
Password string `yaml:"Password"`
DB int `yaml:"DB"`
}
// RedisStore holds the client and maybe later some more metadata.
type RedisStore struct {
client *redis.Client
}
// NewRedisStore returns an new initialized - ready to use RedisStore object.
func NewRedisStore(cfg RedisDBOptions) (*RedisStore, error) {
c := redis.NewClient(&redis.Options{
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
})
if _, err := c.Ping().Result(); err != nil {
return nil, err
}
return &RedisStore{client: c}, nil
}
// Batch implements the Store interface.
func (s *RedisStore) Batch() Batch {
return newMemoryBatch()
}
// Get implements the Store interface.
func (s *RedisStore) Get(k []byte) ([]byte, error) {
val, err := s.client.Get(string(k)).Result()
if err != nil {
if err == redis.Nil {
err = ErrKeyNotFound
}
return nil, err
}
return []byte(val), nil
}
// Delete implements the Store interface.
func (s *RedisStore) Delete(k []byte) error {
s.client.Del(string(k))
return nil
}
// Put implements the Store interface.
func (s *RedisStore) Put(k, v []byte) error {
s.client.Set(string(k), string(v), 0)
return nil
}
// PutBatch implements the Store interface.
func (s *RedisStore) PutBatch(b Batch) error {
storage: introduce PutChangeSet and use it for Persist We're using batches in wrong way during persist, we already have all changes accumulated in two maps and then we move them to batch and then this is applied. For some DBs like BoltDB this batch is just another MemoryStore, so we essentially just shuffle the changeset from one map to another, for others like LevelDB batch is just a serialized set of KV pairs, it doesn't help much on subsequent PutBatch, we just duplicate the changeset again. So introduce PutChangeSet that allows to take two maps with sets and deletes directly. It also allows to simplify MemCachedStore logic. neo-bench for single node with 10 workers, LevelDB: Reference: RPS 30189.132 30556.448 30390.482 ≈ 30379 ± 0.61% TPS 29427.344 29418.687 29434.273 ≈ 29427 ± 0.03% CPU % 33.304 27.179 33.860 ≈ 31.45 ± 11.79% Mem MB 800.677 798.389 715.042 ≈ 771 ± 6.33% Patched: RPS 30264.326 30386.364 30166.231 ≈ 30272 ± 0.36% ⇅ TPS 29444.673 29407.440 29452.478 ≈ 29435 ± 0.08% ⇅ CPU % 34.012 32.597 33.467 ≈ 33.36 ± 2.14% ⇅ Mem MB 549.126 523.656 517.684 ≈ 530 ± 3.15% ↓ 31.26% BoltDB: Reference: RPS 31937.647 31551.684 31850.408 ≈ 31780 ± 0.64% TPS 31292.049 30368.368 31307.724 ≈ 30989 ± 1.74% CPU % 33.792 22.339 35.887 ≈ 30.67 ± 23.78% Mem MB 1271.687 1254.472 1215.639 ≈ 1247 ± 2.30% Patched: RPS 31746.818 30859.485 31689.761 ≈ 31432 ± 1.58% ⇅ TPS 31271.499 30340.726 30342.568 ≈ 30652 ± 1.75% ⇅ CPU % 34.611 34.414 31.553 ≈ 33.53 ± 5.11% ⇅ Mem MB 1262.960 1231.389 1335.569 ≈ 1277 ± 4.18% ⇅
2021-08-12 10:35:09 +00:00
memBatch := b.(*MemoryBatch)
return s.PutChangeSet(memBatch.mem, memBatch.del)
}
// PutChangeSet implements the Store interface.
func (s *RedisStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error {
pipe := s.client.Pipeline()
storage: introduce PutChangeSet and use it for Persist We're using batches in wrong way during persist, we already have all changes accumulated in two maps and then we move them to batch and then this is applied. For some DBs like BoltDB this batch is just another MemoryStore, so we essentially just shuffle the changeset from one map to another, for others like LevelDB batch is just a serialized set of KV pairs, it doesn't help much on subsequent PutBatch, we just duplicate the changeset again. So introduce PutChangeSet that allows to take two maps with sets and deletes directly. It also allows to simplify MemCachedStore logic. neo-bench for single node with 10 workers, LevelDB: Reference: RPS 30189.132 30556.448 30390.482 ≈ 30379 ± 0.61% TPS 29427.344 29418.687 29434.273 ≈ 29427 ± 0.03% CPU % 33.304 27.179 33.860 ≈ 31.45 ± 11.79% Mem MB 800.677 798.389 715.042 ≈ 771 ± 6.33% Patched: RPS 30264.326 30386.364 30166.231 ≈ 30272 ± 0.36% ⇅ TPS 29444.673 29407.440 29452.478 ≈ 29435 ± 0.08% ⇅ CPU % 34.012 32.597 33.467 ≈ 33.36 ± 2.14% ⇅ Mem MB 549.126 523.656 517.684 ≈ 530 ± 3.15% ↓ 31.26% BoltDB: Reference: RPS 31937.647 31551.684 31850.408 ≈ 31780 ± 0.64% TPS 31292.049 30368.368 31307.724 ≈ 30989 ± 1.74% CPU % 33.792 22.339 35.887 ≈ 30.67 ± 23.78% Mem MB 1271.687 1254.472 1215.639 ≈ 1247 ± 2.30% Patched: RPS 31746.818 30859.485 31689.761 ≈ 31432 ± 1.58% ⇅ TPS 31271.499 30340.726 30342.568 ≈ 30652 ± 1.75% ⇅ CPU % 34.611 34.414 31.553 ≈ 33.53 ± 5.11% ⇅ Mem MB 1262.960 1231.389 1335.569 ≈ 1277 ± 4.18% ⇅
2021-08-12 10:35:09 +00:00
for k, v := range puts {
pipe.Set(k, v, 0)
}
storage: introduce PutChangeSet and use it for Persist We're using batches in wrong way during persist, we already have all changes accumulated in two maps and then we move them to batch and then this is applied. For some DBs like BoltDB this batch is just another MemoryStore, so we essentially just shuffle the changeset from one map to another, for others like LevelDB batch is just a serialized set of KV pairs, it doesn't help much on subsequent PutBatch, we just duplicate the changeset again. So introduce PutChangeSet that allows to take two maps with sets and deletes directly. It also allows to simplify MemCachedStore logic. neo-bench for single node with 10 workers, LevelDB: Reference: RPS 30189.132 30556.448 30390.482 ≈ 30379 ± 0.61% TPS 29427.344 29418.687 29434.273 ≈ 29427 ± 0.03% CPU % 33.304 27.179 33.860 ≈ 31.45 ± 11.79% Mem MB 800.677 798.389 715.042 ≈ 771 ± 6.33% Patched: RPS 30264.326 30386.364 30166.231 ≈ 30272 ± 0.36% ⇅ TPS 29444.673 29407.440 29452.478 ≈ 29435 ± 0.08% ⇅ CPU % 34.012 32.597 33.467 ≈ 33.36 ± 2.14% ⇅ Mem MB 549.126 523.656 517.684 ≈ 530 ± 3.15% ↓ 31.26% BoltDB: Reference: RPS 31937.647 31551.684 31850.408 ≈ 31780 ± 0.64% TPS 31292.049 30368.368 31307.724 ≈ 30989 ± 1.74% CPU % 33.792 22.339 35.887 ≈ 30.67 ± 23.78% Mem MB 1271.687 1254.472 1215.639 ≈ 1247 ± 2.30% Patched: RPS 31746.818 30859.485 31689.761 ≈ 31432 ± 1.58% ⇅ TPS 31271.499 30340.726 30342.568 ≈ 30652 ± 1.75% ⇅ CPU % 34.611 34.414 31.553 ≈ 33.53 ± 5.11% ⇅ Mem MB 1262.960 1231.389 1335.569 ≈ 1277 ± 4.18% ⇅
2021-08-12 10:35:09 +00:00
for k := range dels {
pipe.Del(k)
}
_, err := pipe.Exec()
return err
}
// Seek implements the Store interface.
func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) {
iter := s.client.Scan(0, fmt.Sprintf("%s*", k), 0).Iterator()
for iter.Next() {
key := iter.Val()
val, _ := s.client.Get(key).Result()
f([]byte(key), []byte(val))
}
}
// Close implements the Store interface.
func (s *RedisStore) Close() error {
return s.client.Close()
}