neoneo-go/pkg/core/storage/redis_store.go
Vsevolod Brekelov 264dfef370 storage: close function
add close function to storage interface
add common defer function call which will close db connection
remove context as soon as it's not needed anymore
updated unit tests
2019-09-16 18:52:47 +03:00

99 lines
2.2 KiB
Go

package storage
import (
"fmt"
"github.com/go-redis/redis"
)
// RedisDBOptions configuration for RedisDB.
type RedisDBOptions struct {
Addr string `yaml:"Addr"`
Password string `yaml:"Password"`
DB int `yaml:"DB"`
}
// RedisStore holds the client and maybe later some more metadata.
type RedisStore struct {
client *redis.Client
}
// RedisBatch simple batch implementation to satisfy the Store interface.
type RedisBatch struct {
mem map[string]string
}
// Len implements the Batch interface.
func (b *RedisBatch) Len() int {
return len(b.mem)
}
// Put implements the Batch interface.
func (b *RedisBatch) Put(k, v []byte) {
b.mem[string(k)] = string(v)
}
// NewRedisBatch returns a new ready to use RedisBatch.
func NewRedisBatch() *RedisBatch {
return &RedisBatch{
mem: make(map[string]string),
}
}
// NewRedisStore returns an new initialized - ready to use RedisStore object.
func NewRedisStore(cfg RedisDBOptions) (*RedisStore, error) {
c := redis.NewClient(&redis.Options{
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
})
if _, err := c.Ping().Result(); err != nil {
return nil, err
}
return &RedisStore{client: c}, nil
}
// Batch implements the Store interface.
func (s *RedisStore) Batch() Batch {
return NewRedisBatch()
}
// Get implements the Store interface.
func (s *RedisStore) Get(k []byte) ([]byte, error) {
val, err := s.client.Get(string(k)).Result()
if err != nil {
return nil, err
}
return []byte(val), nil
}
// Put implements the Store interface.
func (s *RedisStore) Put(k, v []byte) error {
s.client.Set(string(k), string(v), 0)
return nil
}
// PutBatch implements the Store interface.
func (s *RedisStore) PutBatch(b Batch) error {
pipe := s.client.Pipeline()
for k, v := range b.(*RedisBatch).mem {
pipe.Set(k, v, 0)
}
_, err := pipe.Exec()
return err
}
// Seek implements the Store interface.
func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) {
iter := s.client.Scan(0, fmt.Sprintf("%s*", k), 0).Iterator()
for iter.Next() {
key := iter.Val()
val, _ := s.client.Get(key).Result()
f([]byte(key), []byte(val))
}
}
// Close implements the Store interface.
func (s *RedisStore) Close() error {
return s.client.Close()
}