Merge pull request #3239 from olegburov/bump-redigo

Upgrade Redigo to `1.8.2`.
This commit is contained in:
Arko Dasgupta 2020-11-04 18:19:51 -08:00 committed by GitHub
commit 065aec5688
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 2668 additions and 1333 deletions

2
go.mod
View file

@ -19,7 +19,7 @@ require (
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c
github.com/docker/go-metrics v0.0.1 github.com/docker/go-metrics v0.0.1
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1
github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 github.com/gomodule/redigo v1.8.2
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33
github.com/gorilla/mux v1.7.2 github.com/gorilla/mux v1.7.2
github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect

4
go.sum
View file

@ -41,8 +41,6 @@ github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQ
github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw=
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 h1:ZClxb8laGDf5arXfYcAtECDFgAgHklGI8CxgjHnXKJ4= github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 h1:ZClxb8laGDf5arXfYcAtECDFgAgHklGI8CxgjHnXKJ4=
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 h1:LofdAjjjqCSXMwLGgOgnE+rdPuvX9DxCqaHwKy7i/ko=
github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
@ -53,6 +51,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k=
github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 h1:893HsJqtxp9z1SF76gg6hY70hRY1wVlTSnC/h1yUDCo= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 h1:893HsJqtxp9z1SF76gg6hY70hRY1wVlTSnC/h1yUDCo=

View file

@ -39,7 +39,7 @@ import (
events "github.com/docker/go-events" events "github.com/docker/go-events"
"github.com/docker/go-metrics" "github.com/docker/go-metrics"
"github.com/docker/libtrust" "github.com/docker/libtrust"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -514,11 +514,11 @@ func (app *App) configureRedis(configuration *configuration.Configuration) {
} }
} }
conn, err := redis.DialTimeout("tcp", conn, err := redis.Dial("tcp",
configuration.Redis.Addr, configuration.Redis.Addr,
configuration.Redis.DialTimeout, redis.DialConnectTimeout(configuration.Redis.DialTimeout),
configuration.Redis.ReadTimeout, redis.DialReadTimeout(configuration.Redis.ReadTimeout),
configuration.Redis.WriteTimeout) redis.DialWriteTimeout(configuration.Redis.WriteTimeout))
if err != nil { if err != nil {
dcontext.GetLogger(app).Errorf("error connecting to redis instance %s: %v", dcontext.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
configuration.Redis.Addr, err) configuration.Redis.Addr, err)

View file

@ -8,7 +8,7 @@ import (
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution/registry/storage/cache/metrics" "github.com/docker/distribution/registry/storage/cache/metrics"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
) )

View file

@ -7,7 +7,7 @@ import (
"time" "time"
"github.com/docker/distribution/registry/storage/cache/cachecheck" "github.com/docker/distribution/registry/storage/cache/cachecheck"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
) )
var redisAddr string var redisAddr string

View file

@ -1,45 +0,0 @@
// Copyright 2014 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package internal // import "github.com/garyburd/redigo/internal"
import (
"strings"
)
const (
WatchState = 1 << iota
MultiState
SubscribeState
MonitorState
)
type CommandInfo struct {
Set, Clear int
}
var commandInfos = map[string]CommandInfo{
"WATCH": {Set: WatchState},
"UNWATCH": {Clear: WatchState},
"MULTI": {Set: MultiState},
"EXEC": {Clear: WatchState | MultiState},
"DISCARD": {Clear: WatchState | MultiState},
"PSUBSCRIBE": {Set: SubscribeState},
"SUBSCRIBE": {Set: SubscribeState},
"MONITOR": {Set: MonitorState},
}
func LookupCommandInfo(commandName string) CommandInfo {
return commandInfos[strings.ToUpper(commandName)]
}

View file

@ -1,455 +0,0 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
)
// conn is the low-level implementation of Conn
type conn struct {
// Shared
mu sync.Mutex
pending int
err error
conn net.Conn
// Read
readTimeout time.Duration
br *bufio.Reader
// Write
writeTimeout time.Duration
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
// Dial connects to the Redis server at the given network and address.
func Dial(network, address string) (Conn, error) {
dialer := xDialer{}
return dialer.Dial(network, address)
}
// DialTimeout acts like Dial but takes timeouts for establishing the
// connection to the server, writing a command and reading a reply.
func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) {
netDialer := net.Dialer{Timeout: connectTimeout}
dialer := xDialer{
NetDial: netDialer.Dial,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}
return dialer.Dial(network, address)
}
// A Dialer specifies options for connecting to a Redis server.
type xDialer struct {
// NetDial specifies the dial function for creating TCP connections. If
// NetDial is nil, then net.Dial is used.
NetDial func(network, addr string) (net.Conn, error)
// ReadTimeout specifies the timeout for reading a single command
// reply. If ReadTimeout is zero, then no timeout is used.
ReadTimeout time.Duration
// WriteTimeout specifies the timeout for writing a single command. If
// WriteTimeout is zero, then no timeout is used.
WriteTimeout time.Duration
}
// Dial connects to the Redis server at address on the named network.
func (d *xDialer) Dial(network, address string) (Conn, error) {
dial := d.NetDial
if dial == nil {
dial = net.Dial
}
netConn, err := dial(network, address)
if err != nil {
return nil, err
}
return &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: d.ReadTimeout,
writeTimeout: d.WriteTimeout,
}, nil
}
// NewConn returns a new Redigo connection for the given net connection.
func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn {
return &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
}
func (c *conn) Close() error {
c.mu.Lock()
err := c.err
if c.err == nil {
c.err = errors.New("redigo: closed")
err = c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) fatal(err error) error {
c.mu.Lock()
if c.err == nil {
c.err = err
// Close connection to force errors on subsequent calls and to unblock
// other reader or writer.
c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *conn) writeLen(prefix byte, n int) error {
c.lenScratch[len(c.lenScratch)-1] = '\n'
c.lenScratch[len(c.lenScratch)-2] = '\r'
i := len(c.lenScratch) - 3
for {
c.lenScratch[i] = byte('0' + n%10)
i -= 1
n = n / 10
if n == 0 {
break
}
}
c.lenScratch[i] = prefix
_, err := c.bw.Write(c.lenScratch[i:])
return err
}
func (c *conn) writeString(s string) error {
c.writeLen('$', len(s))
c.bw.WriteString(s)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeBytes(p []byte) error {
c.writeLen('$', len(p))
c.bw.Write(p)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeInt64(n int64) error {
return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10))
}
func (c *conn) writeFloat64(n float64) error {
return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64))
}
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
c.writeLen('*', 1+len(args))
err = c.writeString(cmd)
for _, arg := range args {
if err != nil {
break
}
switch arg := arg.(type) {
case string:
err = c.writeString(arg)
case []byte:
err = c.writeBytes(arg)
case int:
err = c.writeInt64(int64(arg))
case int64:
err = c.writeInt64(arg)
case float64:
err = c.writeFloat64(arg)
case bool:
if arg {
err = c.writeString("1")
} else {
err = c.writeString("0")
}
case nil:
err = c.writeString("")
default:
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
err = c.writeBytes(buf.Bytes())
}
}
return err
}
type protocolError string
func (pe protocolError) Error() string {
return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe))
}
func (c *conn) readLine() ([]byte, error) {
p, err := c.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
return nil, protocolError("long response line")
}
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, protocolError("bad response line terminator")
}
return p[:i], nil
}
// parseLen parses bulk string and array lengths.
func parseLen(p []byte) (int, error) {
if len(p) == 0 {
return -1, protocolError("malformed length")
}
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
// handle $-1 and $-1 null replies.
return -1, nil
}
var n int
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return -1, protocolError("illegal bytes in length")
}
n += int(b - '0')
}
return n, nil
}
// parseInt parses an integer reply.
func parseInt(p []byte) (interface{}, error) {
if len(p) == 0 {
return 0, protocolError("malformed integer")
}
var negate bool
if p[0] == '-' {
negate = true
p = p[1:]
if len(p) == 0 {
return 0, protocolError("malformed integer")
}
}
var n int64
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return 0, protocolError("illegal bytes in length")
}
n += int64(b - '0')
}
if negate {
n = -n
}
return n, nil
}
var (
okReply interface{} = "OK"
pongReply interface{} = "PONG"
)
func (c *conn) readReply() (interface{}, error) {
line, err := c.readLine()
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, protocolError("short response line")
}
switch line[0] {
case '+':
switch {
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, err
}
if line, err := c.readLine(); err != nil {
return nil, err
} else if len(line) != 0 {
return nil, protocolError("bad bulk string format")
}
return p, nil
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
func (c *conn) Send(cmd string, args ...interface{}) error {
c.mu.Lock()
c.pending += 1
c.mu.Unlock()
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.writeCommand(cmd, args); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Flush() error {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.bw.Flush(); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Receive() (reply interface{}, err error) {
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if reply, err = c.readReply(); err != nil {
return nil, c.fatal(err)
}
// When using pub/sub, the number of receives can be greater than the
// number of sends. To enable normal use of the connection after
// unsubscribing from all channels, we do not decrement pending to a
// negative value.
//
// The pending field is decremented after the reply is read to handle the
// case where Receive is called before Send.
c.mu.Lock()
if c.pending > 0 {
c.pending -= 1
}
c.mu.Unlock()
if err, ok := reply.(Error); ok {
return nil, err
}
return
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if cmd == "" && pending == 0 {
return nil, nil
}
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if cmd != "" {
c.writeCommand(cmd, args)
}
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if cmd == "" {
reply := make([]interface{}, pending)
for i := range reply {
r, e := c.readReply()
if e != nil {
return nil, c.fatal(e)
}
reply[i] = r
}
return reply, nil
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
return reply, err
}

View file

@ -1,389 +0,0 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bytes"
"container/list"
"crypto/rand"
"crypto/sha1"
"errors"
"io"
"strconv"
"sync"
"time"
"github.com/garyburd/redigo/internal"
)
var nowFunc = time.Now // for testing
// ErrPoolExhausted is returned from a pool connection method (Do, Send,
// Receive, Flush, Err) when the maximum number of database connections in the
// pool has been reached.
var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
var (
errPoolClosed = errors.New("redigo: connection pool closed")
errConnClosed = errors.New("redigo: connection closed")
)
// Pool maintains a pool of connections. The application calls the Get method
// to get a connection from the pool and the connection's Close method to
// return the connection's resources to the pool.
//
// The following example shows how to use a pool in a web application. The
// application creates a pool at application startup and makes it available to
// request handlers using a global variable.
//
// func newPool(server, password string) *redis.Pool {
// return &redis.Pool{
// MaxIdle: 3,
// IdleTimeout: 240 * time.Second,
// Dial: func () (redis.Conn, error) {
// c, err := redis.Dial("tcp", server)
// if err != nil {
// return nil, err
// }
// if _, err := c.Do("AUTH", password); err != nil {
// c.Close()
// return nil, err
// }
// return c, err
// },
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// _, err := c.Do("PING")
// return err
// },
// }
// }
//
// var (
// pool *redis.Pool
// redisServer = flag.String("redisServer", ":6379", "")
// redisPassword = flag.String("redisPassword", "", "")
// )
//
// func main() {
// flag.Parse()
// pool = newPool(*redisServer, *redisPassword)
// ...
// }
//
// A request handler gets a connection from the pool and closes the connection
// when the handler is done:
//
// func serveHome(w http.ResponseWriter, r *http.Request) {
// conn := pool.Get()
// defer conn.Close()
// ....
// }
//
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection
Dial func() (Conn, error)
// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error
// Maximum number of idle connections in the pool.
MaxIdle int
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration
// If Wait is true and the pool is at the MaxIdle limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool
// mu protects fields defined below.
mu sync.Mutex
cond *sync.Cond
closed bool
active int
// Stack of idleConn with most recently used at the front.
idle list.List
}
type idleConn struct {
c Conn
t time.Time
}
// NewPool creates a new pool. This function is deprecated. Applications should
// initialize the Pool fields directly as shown in example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
return &Pool{Dial: newFn, MaxIdle: maxIdle}
}
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
c, err := p.get()
if err != nil {
return errorConnection{err}
}
return &pooledConnection{p: p, c: c}
}
// ActiveCount returns the number of active connections in the pool.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}
// Close releases the resources used by the pool.
func (p *Pool) Close() error {
p.mu.Lock()
idle := p.idle
p.idle.Init()
p.closed = true
p.active -= idle.Len()
if p.cond != nil {
p.cond.Broadcast()
}
p.mu.Unlock()
for e := idle.Front(); e != nil; e = e.Next() {
e.Value.(idleConn).c.Close()
}
return nil
}
// release decrements the active count and signals waiters. The caller must
// hold p.mu during the call.
func (p *Pool) release() {
p.active -= 1
if p.cond != nil {
p.cond.Signal()
}
}
// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get() (Conn, error) {
p.mu.Lock()
// Prune stale connections.
if timeout := p.IdleTimeout; timeout > 0 {
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Back()
if e == nil {
break
}
ic := e.Value.(idleConn)
if ic.t.Add(timeout).After(nowFunc()) {
break
}
p.idle.Remove(e)
p.release()
p.mu.Unlock()
ic.c.Close()
p.mu.Lock()
}
}
for {
// Get idle connection.
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Front()
if e == nil {
break
}
ic := e.Value.(idleConn)
p.idle.Remove(e)
test := p.TestOnBorrow
p.mu.Unlock()
if test == nil || test(ic.c, ic.t) == nil {
return ic.c, nil
}
ic.c.Close()
p.mu.Lock()
p.release()
}
// Check for pool closed before dialing a new connection.
if p.closed {
p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}
// Dial new connection if under limit.
if p.MaxActive == 0 || p.active < p.MaxActive {
dial := p.Dial
p.active += 1
p.mu.Unlock()
c, err := dial()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
c = nil
}
return c, err
}
if !p.Wait {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
if p.cond == nil {
p.cond = sync.NewCond(&p.mu)
}
p.cond.Wait()
}
}
func (p *Pool) put(c Conn, forceClose bool) error {
err := c.Err()
p.mu.Lock()
if !p.closed && err == nil && !forceClose {
p.idle.PushFront(idleConn{t: nowFunc(), c: c})
if p.idle.Len() > p.MaxIdle {
c = p.idle.Remove(p.idle.Back()).(idleConn).c
} else {
c = nil
}
}
if c == nil {
if p.cond != nil {
p.cond.Signal()
}
p.mu.Unlock()
return nil
}
p.release()
p.mu.Unlock()
return c.Close()
}
type pooledConnection struct {
p *Pool
c Conn
state int
}
var (
sentinel []byte
sentinelOnce sync.Once
)
func initSentinel() {
p := make([]byte, 64)
if _, err := rand.Read(p); err == nil {
sentinel = p
} else {
h := sha1.New()
io.WriteString(h, "Oops, rand failed. Use time instead.")
io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
sentinel = h.Sum(nil)
}
}
func (pc *pooledConnection) Close() error {
c := pc.c
if _, ok := c.(errorConnection); ok {
return nil
}
pc.c = errorConnection{errConnClosed}
if pc.state&internal.MultiState != 0 {
c.Send("DISCARD")
pc.state &^= (internal.MultiState | internal.WatchState)
} else if pc.state&internal.WatchState != 0 {
c.Send("UNWATCH")
pc.state &^= internal.WatchState
}
if pc.state&internal.SubscribeState != 0 {
c.Send("UNSUBSCRIBE")
c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
c.Send("ECHO", sentinel)
c.Flush()
for {
p, err := c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
pc.state &^= internal.SubscribeState
break
}
}
}
c.Do("")
pc.p.put(c, pc.state != 0)
return nil
}
func (pc *pooledConnection) Err() error {
return pc.c.Err()
}
func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}
func (pc *pooledConnection) Send(commandName string, args ...interface{}) error {
ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
return pc.c.Send(commandName, args...)
}
func (pc *pooledConnection) Flush() error {
return pc.c.Flush()
}
func (pc *pooledConnection) Receive() (reply interface{}, err error) {
return pc.c.Receive()
}
type errorConnection struct{ err error }
func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConnection) Send(string, ...interface{}) error { return ec.err }
func (ec errorConnection) Err() error { return ec.err }
func (ec errorConnection) Close() error { return ec.err }
func (ec errorConnection) Flush() error { return ec.err }
func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err }

View file

@ -1,44 +0,0 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value if the connection is broken. The returned
// value is either the first non-nil value returned from the underlying
// network connection or a protocol parsing error. Applications should
// close broken connections.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}

View file

@ -1,312 +0,0 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"fmt"
"strconv"
)
// ErrNil indicates that a reply value is nil.
var ErrNil = errors.New("redigo: nil returned")
// Int is a helper that converts a command reply to an integer. If err is not
// equal to nil, then Int returns 0, err. Otherwise, Int converts the
// reply to an int as follows:
//
// Reply type Result
// integer int(reply), nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
x := int(reply)
if int64(x) != reply {
return 0, strconv.ErrRange
}
return x, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 0)
return int(n), err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply)
}
// Int64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
return reply, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply)
}
var errNegativeInt = errors.New("redigo: unexpected value for Uint64")
// Uint64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Uint64(reply interface{}, err error) (uint64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
if reply < 0 {
return 0, errNegativeInt
}
return uint64(reply), nil
case []byte:
n, err := strconv.ParseUint(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Uint64, got type %T", reply)
}
// Float64 is a helper that converts a command reply to 64 bit float. If err is
// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts
// the reply to an int as follows:
//
// Reply type Result
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case []byte:
n, err := strconv.ParseFloat(string(reply), 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply)
}
// String is a helper that converts a command reply to a string. If err is not
// equal to nil, then String returns "", err. Otherwise String converts the
// reply to a string as follows:
//
// Reply type Result
// bulk string string(reply), nil
// simple string reply, nil
// nil "", ErrNil
// other "", error
func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}
switch reply := reply.(type) {
case []byte:
return string(reply), nil
case string:
return reply, nil
case nil:
return "", ErrNil
case Error:
return "", reply
}
return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply)
}
// Bytes is a helper that converts a command reply to a slice of bytes. If err
// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts
// the reply to a slice of bytes as follows:
//
// Reply type Result
// bulk string reply, nil
// simple string []byte(reply), nil
// nil nil, ErrNil
// other nil, error
func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
return reply, nil
case string:
return []byte(reply), nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply)
}
// Bool is a helper that converts a command reply to a boolean. If err is not
// equal to nil, then Bool returns false, err. Otherwise Bool converts the
// reply to boolean as follows:
//
// Reply type Result
// integer value != 0, nil
// bulk string strconv.ParseBool(reply)
// nil false, ErrNil
// other false, error
func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case int64:
return reply != 0, nil
case []byte:
return strconv.ParseBool(string(reply))
case nil:
return false, ErrNil
case Error:
return false, reply
}
return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply)
}
// MultiBulk is deprecated. Use Values.
func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) }
// Values is a helper that converts an array command reply to a []interface{}.
// If err is not equal to nil, then Values returns nil, err. Otherwise, Values
// converts the reply as follows:
//
// Reply type Result
// array reply, nil
// nil nil, ErrNil
// other nil, error
func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply)
}
// Strings is a helper that converts an array command reply to a []string. If
// err is not equal to nil, then Strings returns nil, err. Nil array items are
// converted to "" in the output slice. Strings returns an error if an array
// item is not a bulk string or nil.
func Strings(reply interface{}, err error) ([]string, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([]string, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
p, ok := reply[i].([]byte)
if !ok {
return nil, fmt.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i])
}
result[i] = string(p)
}
return result, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Strings, got type %T", reply)
}
// Ints is a helper that converts an array command reply to a []int. If
// err is not equal to nil, then Ints returns nil, err.
func Ints(reply interface{}, err error) ([]int, error) {
var ints []int
if reply == nil {
return ints, ErrNil
}
values, err := Values(reply, err)
if err != nil {
return ints, err
}
if err := ScanSlice(values, &ints); err != nil {
return ints, err
}
return ints, nil
}
// StringMap is a helper that converts an array of strings (alternating key, value)
// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format.
// Requires an even number of values in result.
func StringMap(result interface{}, err error) (map[string]string, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, errors.New("redigo: StringMap expects even number of values result")
}
m := make(map[string]string, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, okKey := values[i].([]byte)
value, okValue := values[i+1].([]byte)
if !okKey || !okValue {
return nil, errors.New("redigo: ScanMap key not a bulk string value")
}
m[string(key)] = string(value)
}
return m, nil
}

175
vendor/github.com/gomodule/redigo/LICENSE generated vendored Normal file
View file

@ -0,0 +1,175 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

55
vendor/github.com/gomodule/redigo/redis/commandinfo.go generated vendored Normal file
View file

@ -0,0 +1,55 @@
// Copyright 2014 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"strings"
)
const (
connectionWatchState = 1 << iota
connectionMultiState
connectionSubscribeState
connectionMonitorState
)
type commandInfo struct {
// Set or Clear these states on connection.
Set, Clear int
}
var commandInfos = map[string]commandInfo{
"WATCH": {Set: connectionWatchState},
"UNWATCH": {Clear: connectionWatchState},
"MULTI": {Set: connectionMultiState},
"EXEC": {Clear: connectionWatchState | connectionMultiState},
"DISCARD": {Clear: connectionWatchState | connectionMultiState},
"PSUBSCRIBE": {Set: connectionSubscribeState},
"SUBSCRIBE": {Set: connectionSubscribeState},
"MONITOR": {Set: connectionMonitorState},
}
func init() {
for n, ci := range commandInfos {
commandInfos[strings.ToLower(n)] = ci
}
}
func lookupCommandInfo(commandName string) commandInfo {
if ci, ok := commandInfos[commandName]; ok {
return ci
}
return commandInfos[strings.ToUpper(commandName)]
}

736
vendor/github.com/gomodule/redigo/redis/conn.go generated vendored Normal file
View file

@ -0,0 +1,736 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/url"
"regexp"
"strconv"
"sync"
"time"
)
var (
_ ConnWithTimeout = (*conn)(nil)
)
// conn is the low-level implementation of Conn
type conn struct {
// Shared
mu sync.Mutex
pending int
err error
conn net.Conn
// Read
readTimeout time.Duration
br *bufio.Reader
// Write
writeTimeout time.Duration
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
// DialTimeout acts like Dial but takes timeouts for establishing the
// connection to the server, writing a command and reading a reply.
//
// Deprecated: Use Dial with options instead.
func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) {
return Dial(network, address,
DialConnectTimeout(connectTimeout),
DialReadTimeout(readTimeout),
DialWriteTimeout(writeTimeout))
}
// DialOption specifies an option for dialing a Redis server.
type DialOption struct {
f func(*dialOptions)
}
type dialOptions struct {
readTimeout time.Duration
writeTimeout time.Duration
dialer *net.Dialer
dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
db int
username string
password string
clientName string
useTLS bool
skipVerify bool
tlsConfig *tls.Config
}
// DialReadTimeout specifies the timeout for reading a single command reply.
func DialReadTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.readTimeout = d
}}
}
// DialWriteTimeout specifies the timeout for writing a single command.
func DialWriteTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.writeTimeout = d
}}
}
// DialConnectTimeout specifies the timeout for connecting to the Redis server when
// no DialNetDial option is specified.
func DialConnectTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.dialer.Timeout = d
}}
}
// DialKeepAlive specifies the keep-alive period for TCP connections to the Redis server
// when no DialNetDial option is specified.
// If zero, keep-alives are not enabled. If no DialKeepAlive option is specified then
// the default of 5 minutes is used to ensure that half-closed TCP sessions are detected.
func DialKeepAlive(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.dialer.KeepAlive = d
}}
}
// DialNetDial specifies a custom dial function for creating TCP
// connections, otherwise a net.Dialer customized via the other options is used.
// DialNetDial overrides DialConnectTimeout and DialKeepAlive.
func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption {
return DialOption{func(do *dialOptions) {
do.dialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return dial(network, addr)
}
}}
}
// DialContextFunc specifies a custom dial function with context for creating TCP
// connections, otherwise a net.Dialer customized via the other options is used.
// DialContextFunc overrides DialConnectTimeout and DialKeepAlive.
func DialContextFunc(f func(ctx context.Context, network, addr string) (net.Conn, error)) DialOption {
return DialOption{func(do *dialOptions) {
do.dialContext = f
}}
}
// DialDatabase specifies the database to select when dialing a connection.
func DialDatabase(db int) DialOption {
return DialOption{func(do *dialOptions) {
do.db = db
}}
}
// DialPassword specifies the password to use when connecting to
// the Redis server.
func DialPassword(password string) DialOption {
return DialOption{func(do *dialOptions) {
do.password = password
}}
}
// DialUsername specifies the username to use when connecting to
// the Redis server when Redis ACLs are used.
func DialUsername(username string) DialOption {
return DialOption{func(do *dialOptions) {
do.username = username
}}
}
// DialClientName specifies a client name to be used
// by the Redis server connection.
func DialClientName(name string) DialOption {
return DialOption{func(do *dialOptions) {
do.clientName = name
}}
}
// DialTLSConfig specifies the config to use when a TLS connection is dialed.
// Has no effect when not dialing a TLS connection.
func DialTLSConfig(c *tls.Config) DialOption {
return DialOption{func(do *dialOptions) {
do.tlsConfig = c
}}
}
// DialTLSSkipVerify disables server name verification when connecting over
// TLS. Has no effect when not dialing a TLS connection.
func DialTLSSkipVerify(skip bool) DialOption {
return DialOption{func(do *dialOptions) {
do.skipVerify = skip
}}
}
// DialUseTLS specifies whether TLS should be used when connecting to the
// server. This option is ignore by DialURL.
func DialUseTLS(useTLS bool) DialOption {
return DialOption{func(do *dialOptions) {
do.useTLS = useTLS
}}
}
// Dial connects to the Redis server at the given network and
// address using the specified options.
func Dial(network, address string, options ...DialOption) (Conn, error) {
return DialContext(context.Background(), network, address, options...)
}
// DialContext connects to the Redis server at the given network and
// address using the specified options and context.
func DialContext(ctx context.Context, network, address string, options ...DialOption) (Conn, error) {
do := dialOptions{
dialer: &net.Dialer{
KeepAlive: time.Minute * 5,
},
}
for _, option := range options {
option.f(&do)
}
if do.dialContext == nil {
do.dialContext = do.dialer.DialContext
}
netConn, err := do.dialContext(ctx, network, address)
if err != nil {
return nil, err
}
if do.useTLS {
var tlsConfig *tls.Config
if do.tlsConfig == nil {
tlsConfig = &tls.Config{InsecureSkipVerify: do.skipVerify}
} else {
tlsConfig = cloneTLSConfig(do.tlsConfig)
}
if tlsConfig.ServerName == "" {
host, _, err := net.SplitHostPort(address)
if err != nil {
netConn.Close()
return nil, err
}
tlsConfig.ServerName = host
}
tlsConn := tls.Client(netConn, tlsConfig)
if err := tlsConn.Handshake(); err != nil {
netConn.Close()
return nil, err
}
netConn = tlsConn
}
c := &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: do.readTimeout,
writeTimeout: do.writeTimeout,
}
if do.password != "" {
authArgs := make([]interface{}, 0, 2)
if do.username != "" {
authArgs = append(authArgs, do.username)
}
authArgs = append(authArgs, do.password)
if _, err := c.Do("AUTH", authArgs...); err != nil {
netConn.Close()
return nil, err
}
}
if do.clientName != "" {
if _, err := c.Do("CLIENT", "SETNAME", do.clientName); err != nil {
netConn.Close()
return nil, err
}
}
if do.db != 0 {
if _, err := c.Do("SELECT", do.db); err != nil {
netConn.Close()
return nil, err
}
}
return c, nil
}
var pathDBRegexp = regexp.MustCompile(`/(\d*)\z`)
// DialURL connects to a Redis server at the given URL using the Redis
// URI scheme. URLs should follow the draft IANA specification for the
// scheme (https://www.iana.org/assignments/uri-schemes/prov/redis).
func DialURL(rawurl string, options ...DialOption) (Conn, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
if u.Scheme != "redis" && u.Scheme != "rediss" {
return nil, fmt.Errorf("invalid redis URL scheme: %s", u.Scheme)
}
if u.Opaque != "" {
return nil, fmt.Errorf("invalid redis URL, url is opaque: %s", rawurl)
}
// As per the IANA draft spec, the host defaults to localhost and
// the port defaults to 6379.
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
// assume port is missing
host = u.Host
port = "6379"
}
if host == "" {
host = "localhost"
}
address := net.JoinHostPort(host, port)
if u.User != nil {
password, isSet := u.User.Password()
if isSet {
options = append(options, DialUsername(u.User.Username()), DialPassword(password))
}
}
match := pathDBRegexp.FindStringSubmatch(u.Path)
if len(match) == 2 {
db := 0
if len(match[1]) > 0 {
db, err = strconv.Atoi(match[1])
if err != nil {
return nil, fmt.Errorf("invalid database: %s", u.Path[1:])
}
}
if db != 0 {
options = append(options, DialDatabase(db))
}
} else if u.Path != "" {
return nil, fmt.Errorf("invalid database: %s", u.Path[1:])
}
options = append(options, DialUseTLS(u.Scheme == "rediss"))
return Dial("tcp", address, options...)
}
// NewConn returns a new Redigo connection for the given net connection.
func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn {
return &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
}
func (c *conn) Close() error {
c.mu.Lock()
err := c.err
if c.err == nil {
c.err = errors.New("redigo: closed")
err = c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) fatal(err error) error {
c.mu.Lock()
if c.err == nil {
c.err = err
// Close connection to force errors on subsequent calls and to unblock
// other reader or writer.
c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *conn) writeLen(prefix byte, n int) error {
c.lenScratch[len(c.lenScratch)-1] = '\n'
c.lenScratch[len(c.lenScratch)-2] = '\r'
i := len(c.lenScratch) - 3
for {
c.lenScratch[i] = byte('0' + n%10)
i -= 1
n = n / 10
if n == 0 {
break
}
}
c.lenScratch[i] = prefix
_, err := c.bw.Write(c.lenScratch[i:])
return err
}
func (c *conn) writeString(s string) error {
c.writeLen('$', len(s))
c.bw.WriteString(s)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeBytes(p []byte) error {
c.writeLen('$', len(p))
c.bw.Write(p)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeInt64(n int64) error {
return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10))
}
func (c *conn) writeFloat64(n float64) error {
return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64))
}
func (c *conn) writeCommand(cmd string, args []interface{}) error {
c.writeLen('*', 1+len(args))
if err := c.writeString(cmd); err != nil {
return err
}
for _, arg := range args {
if err := c.writeArg(arg, true); err != nil {
return err
}
}
return nil
}
func (c *conn) writeArg(arg interface{}, argumentTypeOK bool) (err error) {
switch arg := arg.(type) {
case string:
return c.writeString(arg)
case []byte:
return c.writeBytes(arg)
case int:
return c.writeInt64(int64(arg))
case int64:
return c.writeInt64(arg)
case float64:
return c.writeFloat64(arg)
case bool:
if arg {
return c.writeString("1")
} else {
return c.writeString("0")
}
case nil:
return c.writeString("")
case Argument:
if argumentTypeOK {
return c.writeArg(arg.RedisArg(), false)
}
// See comment in default clause below.
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
return c.writeBytes(buf.Bytes())
default:
// This default clause is intended to handle builtin numeric types.
// The function should return an error for other types, but this is not
// done for compatibility with previous versions of the package.
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
return c.writeBytes(buf.Bytes())
}
}
type protocolError string
func (pe protocolError) Error() string {
return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe))
}
// readLine reads a line of input from the RESP stream.
func (c *conn) readLine() ([]byte, error) {
// To avoid allocations, attempt to read the line using ReadSlice. This
// call typically succeeds. The known case where the call fails is when
// reading the output from the MONITOR command.
p, err := c.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
// The line does not fit in the bufio.Reader's buffer. Fall back to
// allocating a buffer for the line.
buf := append([]byte{}, p...)
for err == bufio.ErrBufferFull {
p, err = c.br.ReadSlice('\n')
buf = append(buf, p...)
}
p = buf
}
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, protocolError("bad response line terminator")
}
return p[:i], nil
}
// parseLen parses bulk string and array lengths.
func parseLen(p []byte) (int, error) {
if len(p) == 0 {
return -1, protocolError("malformed length")
}
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
// handle $-1 and $-1 null replies.
return -1, nil
}
var n int
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return -1, protocolError("illegal bytes in length")
}
n += int(b - '0')
}
return n, nil
}
// parseInt parses an integer reply.
func parseInt(p []byte) (interface{}, error) {
if len(p) == 0 {
return 0, protocolError("malformed integer")
}
var negate bool
if p[0] == '-' {
negate = true
p = p[1:]
if len(p) == 0 {
return 0, protocolError("malformed integer")
}
}
var n int64
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return 0, protocolError("illegal bytes in length")
}
n += int64(b - '0')
}
if negate {
n = -n
}
return n, nil
}
var (
okReply interface{} = "OK"
pongReply interface{} = "PONG"
)
func (c *conn) readReply() (interface{}, error) {
line, err := c.readLine()
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, protocolError("short response line")
}
switch line[0] {
case '+':
switch string(line[1:]) {
case "OK":
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case "PONG":
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, err
}
if line, err := c.readLine(); err != nil {
return nil, err
} else if len(line) != 0 {
return nil, protocolError("bad bulk string format")
}
return p, nil
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
func (c *conn) Send(cmd string, args ...interface{}) error {
c.mu.Lock()
c.pending += 1
c.mu.Unlock()
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.writeCommand(cmd, args); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Flush() error {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.bw.Flush(); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Receive() (interface{}, error) {
return c.ReceiveWithTimeout(c.readTimeout)
}
func (c *conn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
var deadline time.Time
if timeout != 0 {
deadline = time.Now().Add(timeout)
}
c.conn.SetReadDeadline(deadline)
if reply, err = c.readReply(); err != nil {
return nil, c.fatal(err)
}
// When using pub/sub, the number of receives can be greater than the
// number of sends. To enable normal use of the connection after
// unsubscribing from all channels, we do not decrement pending to a
// negative value.
//
// The pending field is decremented after the reply is read to handle the
// case where Receive is called before Send.
c.mu.Lock()
if c.pending > 0 {
c.pending -= 1
}
c.mu.Unlock()
if err, ok := reply.(Error); ok {
return nil, err
}
return
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.DoWithTimeout(c.readTimeout, cmd, args...)
}
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if cmd == "" && pending == 0 {
return nil, nil
}
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if cmd != "" {
if err := c.writeCommand(cmd, args); err != nil {
return nil, c.fatal(err)
}
}
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
var deadline time.Time
if readTimeout != 0 {
deadline = time.Now().Add(readTimeout)
}
c.conn.SetReadDeadline(deadline)
if cmd == "" {
reply := make([]interface{}, pending)
for i := range reply {
r, e := c.readReply()
if e != nil {
return nil, c.fatal(e)
}
reply[i] = r
}
return reply, nil
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
return reply, err
}

View file

@ -14,7 +14,7 @@
// Package redis is a client for the Redis database. // Package redis is a client for the Redis database.
// //
// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more // The Redigo FAQ (https://github.com/gomodule/redigo/wiki/FAQ) contains more
// documentation about this package. // documentation about this package.
// //
// Connections // Connections
@ -38,7 +38,7 @@
// //
// n, err := conn.Do("APPEND", "key", "value") // n, err := conn.Do("APPEND", "key", "value")
// //
// The Do method converts command arguments to binary strings for transmission // The Do method converts command arguments to bulk strings for transmission
// to the server as follows: // to the server as follows:
// //
// Go Type Conversion // Go Type Conversion
@ -48,7 +48,7 @@
// float64 strconv.FormatFloat(v, 'g', -1, 64) // float64 strconv.FormatFloat(v, 'g', -1, 64)
// bool true -> "1", false -> "0" // bool true -> "1", false -> "0"
// nil "" // nil ""
// all other types fmt.Print(v) // all other types fmt.Fprint(w, v)
// //
// Redis command reply types are represented using the following Go types: // Redis command reply types are represented using the following Go types:
// //
@ -99,15 +99,14 @@
// //
// Concurrency // Concurrency
// //
// Connections do not support concurrent calls to the write methods (Send, // Connections support one concurrent caller to the Receive method and one
// Flush) or concurrent calls to the read method (Receive). Connections do // concurrent caller to the Send and Flush methods. No other concurrency is
// allow a concurrent reader and writer. // supported including concurrent calls to the Do and Close methods.
// //
// Because the Do method combines the functionality of Send, Flush and Receive, // For full concurrent access to Redis, use the thread-safe Pool to get, use
// the Do method cannot be called concurrently with the other methods. // and release a connection from within a goroutine. Connections returned from
// // a Pool have the concurrency restrictions described in the previous
// For full concurrent access to Redis, use the thread-safe Pool to get and // paragraph.
// release connections from within a goroutine.
// //
// Publish and Subscribe // Publish and Subscribe
// //
@ -128,7 +127,7 @@
// send and flush a subscription management command. The receive method // send and flush a subscription management command. The receive method
// converts a pushed message to convenient types for use in a type switch. // converts a pushed message to convenient types for use in a type switch.
// //
// psc := redis.PubSubConn{c} // psc := redis.PubSubConn{Conn: c}
// psc.Subscribe("example") // psc.Subscribe("example")
// for { // for {
// switch v := psc.Receive().(type) { // switch v := psc.Receive().(type) {
@ -166,4 +165,13 @@
// if _, err := redis.Scan(reply, &value1, &value2); err != nil { // if _, err := redis.Scan(reply, &value1, &value2); err != nil {
// // handle error // // handle error
// } // }
package redis // import "github.com/garyburd/redigo/redis" //
// Errors
//
// Connection methods return error replies from the server as type redis.Error.
//
// Call the connection Err() method to determine if the connection encountered
// non-recoverable error such as a network error or protocol parsing error. If
// Err() returns a non-nil value, then the connection is not usable and should
// be closed.
package redis

29
vendor/github.com/gomodule/redigo/redis/go17.go generated vendored Normal file
View file

@ -0,0 +1,29 @@
// +build go1.7,!go1.8
package redis
import "crypto/tls"
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
return &tls.Config{
Rand: cfg.Rand,
Time: cfg.Time,
Certificates: cfg.Certificates,
NameToCertificate: cfg.NameToCertificate,
GetCertificate: cfg.GetCertificate,
RootCAs: cfg.RootCAs,
NextProtos: cfg.NextProtos,
ServerName: cfg.ServerName,
ClientAuth: cfg.ClientAuth,
ClientCAs: cfg.ClientCAs,
InsecureSkipVerify: cfg.InsecureSkipVerify,
CipherSuites: cfg.CipherSuites,
PreferServerCipherSuites: cfg.PreferServerCipherSuites,
ClientSessionCache: cfg.ClientSessionCache,
MinVersion: cfg.MinVersion,
MaxVersion: cfg.MaxVersion,
CurvePreferences: cfg.CurvePreferences,
DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled,
Renegotiation: cfg.Renegotiation,
}
}

9
vendor/github.com/gomodule/redigo/redis/go18.go generated vendored Normal file
View file

@ -0,0 +1,9 @@
// +build go1.8
package redis
import "crypto/tls"
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
return cfg.Clone()
}

View file

@ -18,6 +18,11 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"log" "log"
"time"
)
var (
_ ConnWithTimeout = (*loggingConn)(nil)
) )
// NewLoggingConn returns a logging wrapper around a connection. // NewLoggingConn returns a logging wrapper around a connection.
@ -25,13 +30,22 @@ func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
if prefix != "" { if prefix != "" {
prefix = prefix + "." prefix = prefix + "."
} }
return &loggingConn{conn, logger, prefix} return &loggingConn{conn, logger, prefix, nil}
}
//NewLoggingConnFilter returns a logging wrapper around a connection and a filter function.
func NewLoggingConnFilter(conn Conn, logger *log.Logger, prefix string, skip func(cmdName string) bool) Conn {
if prefix != "" {
prefix = prefix + "."
}
return &loggingConn{conn, logger, prefix, skip}
} }
type loggingConn struct { type loggingConn struct {
Conn Conn
logger *log.Logger logger *log.Logger
prefix string prefix string
skip func(cmdName string) bool
} }
func (c *loggingConn) Close() error { func (c *loggingConn) Close() error {
@ -80,6 +94,9 @@ func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) {
} }
func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) { func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) {
if c.skip != nil && c.skip(commandName) {
return
}
var buf bytes.Buffer var buf bytes.Buffer
fmt.Fprintf(&buf, "%s%s(", c.prefix, method) fmt.Fprintf(&buf, "%s%s(", c.prefix, method)
if method != "Receive" { if method != "Receive" {
@ -104,6 +121,12 @@ func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{},
return reply, err return reply, err
} }
func (c *loggingConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) {
reply, err := DoWithTimeout(c.Conn, timeout, commandName, args...)
c.print("DoWithTimeout", commandName, args, reply, err)
return reply, err
}
func (c *loggingConn) Send(commandName string, args ...interface{}) error { func (c *loggingConn) Send(commandName string, args ...interface{}) error {
err := c.Conn.Send(commandName, args...) err := c.Conn.Send(commandName, args...)
c.print("Send", commandName, args, nil, err) c.print("Send", commandName, args, nil, err)
@ -115,3 +138,9 @@ func (c *loggingConn) Receive() (interface{}, error) {
c.print("Receive", "", nil, reply, err) c.print("Receive", "", nil, reply, err)
return reply, err return reply, err
} }
func (c *loggingConn) ReceiveWithTimeout(timeout time.Duration) (interface{}, error) {
reply, err := ReceiveWithTimeout(c.Conn, timeout)
c.print("ReceiveWithTimeout", "", nil, reply, err)
return reply, err
}

635
vendor/github.com/gomodule/redigo/redis/pool.go generated vendored Normal file
View file

@ -0,0 +1,635 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"errors"
"io"
"strconv"
"sync"
"sync/atomic"
"time"
)
var (
_ ConnWithTimeout = (*activeConn)(nil)
_ ConnWithTimeout = (*errorConn)(nil)
)
var nowFunc = time.Now // for testing
// ErrPoolExhausted is returned from a pool connection method (Do, Send,
// Receive, Flush, Err) when the maximum number of database connections in the
// pool has been reached.
var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
var (
errPoolClosed = errors.New("redigo: connection pool closed")
errConnClosed = errors.New("redigo: connection closed")
)
// Pool maintains a pool of connections. The application calls the Get method
// to get a connection from the pool and the connection's Close method to
// return the connection's resources to the pool.
//
// The following example shows how to use a pool in a web application. The
// application creates a pool at application startup and makes it available to
// request handlers using a package level variable. The pool configuration used
// here is an example, not a recommendation.
//
// func newPool(addr string) *redis.Pool {
// return &redis.Pool{
// MaxIdle: 3,
// IdleTimeout: 240 * time.Second,
// // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
// Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
// }
// }
//
// var (
// pool *redis.Pool
// redisServer = flag.String("redisServer", ":6379", "")
// )
//
// func main() {
// flag.Parse()
// pool = newPool(*redisServer)
// ...
// }
//
// A request handler gets a connection from the pool and closes the connection
// when the handler is done:
//
// func serveHome(w http.ResponseWriter, r *http.Request) {
// conn := pool.Get()
// defer conn.Close()
// ...
// }
//
// Use the Dial function to authenticate connections with the AUTH command or
// select a database with the SELECT command:
//
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// Dial: func () (redis.Conn, error) {
// c, err := redis.Dial("tcp", server)
// if err != nil {
// return nil, err
// }
// if _, err := c.Do("AUTH", password); err != nil {
// c.Close()
// return nil, err
// }
// if _, err := c.Do("SELECT", db); err != nil {
// c.Close()
// return nil, err
// }
// return c, nil
// },
// }
//
// Use the TestOnBorrow function to check the health of an idle connection
// before the connection is returned to the application. This example PINGs
// connections that have been idle more than a minute:
//
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// if time.Since(t) < time.Minute {
// return nil
// }
// _, err := c.Do("PING")
// return err
// },
// }
//
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
Dial func() (Conn, error)
// DialContext is an application supplied function for creating and configuring a
// connection with the given context.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
DialContext func(ctx context.Context) (Conn, error)
// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error
// Maximum number of idle connections in the pool.
MaxIdle int
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration
// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool
// Close connections older than this duration. If the value is zero, then
// the pool does not close connections based on age.
MaxConnLifetime time.Duration
chInitialized uint32 // set to 1 when field ch is initialized
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}
// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directly as shown in the example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
return &Pool{Dial: newFn, MaxIdle: maxIdle}
}
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
// GetContext returns errorConn in the first argument when an error occurs.
c, _ := p.GetContext(context.Background())
return c
}
// GetContext gets a connection using the provided context.
//
// The provided Context must be non-nil. If the context expires before the
// connection is complete, an error is returned. Any expiration on the context
// will not affect the returned connection.
//
// If the function completes without error, then the application must close the
// returned connection.
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
return nil, err
}
p.mu.Lock()
if waited > 0 {
p.waitCount++
p.waitDuration += waited
}
// Prune stale connections at the back of the idle list.
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
// Get idle connection from the front of idle list.
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
return &activeConn{p: p, pc: pc}, nil
}
pc.c.Close()
p.mu.Lock()
p.active--
}
// Check for pool closed before dialing a new connection.
if p.closed {
p.mu.Unlock()
err := errors.New("redigo: get on closed pool")
return errorConn{err}, err
}
// Handle limit for p.Wait == false.
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return errorConn{ErrPoolExhausted}, ErrPoolExhausted
}
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
if err != nil {
c = nil
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return errorConn{err}, err
}
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
}
// PoolStats contains pool statistics.
type PoolStats struct {
// ActiveCount is the number of connections in the pool. The count includes
// idle connections and connections in use.
ActiveCount int
// IdleCount is the number of idle connections in the pool.
IdleCount int
// WaitCount is the total number of connections waited for.
// This value is currently not guaranteed to be 100% accurate.
WaitCount int64
// WaitDuration is the total time blocked waiting for a new connection.
// This value is currently not guaranteed to be 100% accurate.
WaitDuration time.Duration
}
// Stats returns pool's statistics.
func (p *Pool) Stats() PoolStats {
p.mu.Lock()
stats := PoolStats{
ActiveCount: p.active,
IdleCount: p.idle.count,
WaitCount: p.waitCount,
WaitDuration: p.waitDuration,
}
p.mu.Unlock()
return stats
}
// ActiveCount returns the number of connections in the pool. The count
// includes idle connections and connections in use.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}
// IdleCount returns the number of idle connections in the pool.
func (p *Pool) IdleCount() int {
p.mu.Lock()
idle := p.idle.count
p.mu.Unlock()
return idle
}
// Close releases the resources used by the pool.
func (p *Pool) Close() error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil
}
p.closed = true
p.active -= p.idle.count
pc := p.idle.front
p.idle.count = 0
p.idle.front, p.idle.back = nil, nil
if p.ch != nil {
close(p.ch)
}
p.mu.Unlock()
for ; pc != nil; pc = pc.next {
pc.c.Close()
}
return nil
}
func (p *Pool) lazyInit() {
// Fast path.
if atomic.LoadUint32(&p.chInitialized) == 1 {
return
}
// Slow path.
p.mu.Lock()
if p.chInitialized == 0 {
p.ch = make(chan struct{}, p.MaxActive)
if p.closed {
close(p.ch)
} else {
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
atomic.StoreUint32(&p.chInitialized, 1)
}
p.mu.Unlock()
}
// waitVacantConn waits for a vacant connection in pool if waiting
// is enabled and pool size is limited, otherwise returns instantly.
// If ctx expires before that, an error is returned.
//
// If there were no vacant connection in the pool right away it returns the time spent waiting
// for that connection to appear in the pool.
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
if !p.Wait || p.MaxActive <= 0 {
// No wait or no connection limit.
return 0, nil
}
p.lazyInit()
// wait indicates if we believe it will block so its not 100% accurate
// however for stats it should be good enough.
wait := len(p.ch) == 0
var start time.Time
if wait {
start = time.Now()
}
if ctx == nil {
<-p.ch
} else {
select {
case <-p.ch:
// Additionally check that context hasn't expired while we were waiting,
// because `select` picks a random `case` if several of them are "ready".
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
case <-ctx.Done():
return 0, ctx.Err()
}
}
if wait {
return time.Since(start), nil
}
return 0, nil
}
func (p *Pool) dial(ctx context.Context) (Conn, error) {
if p.DialContext != nil {
return p.DialContext(ctx)
}
if p.Dial != nil {
return p.Dial()
}
return nil, errors.New("redigo: must pass Dial or DialContext to pool")
}
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc()
p.idle.pushFront(pc)
if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()
} else {
pc = nil
}
}
if pc != nil {
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return nil
}
type activeConn struct {
p *Pool
pc *poolConn
state int
}
var (
sentinel []byte
sentinelOnce sync.Once
)
func initSentinel() {
p := make([]byte, 64)
if _, err := rand.Read(p); err == nil {
sentinel = p
} else {
h := sha1.New()
io.WriteString(h, "Oops, rand failed. Use time instead.")
io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
sentinel = h.Sum(nil)
}
}
func (ac *activeConn) Close() error {
pc := ac.pc
if pc == nil {
return nil
}
ac.pc = nil
if ac.state&connectionMultiState != 0 {
pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {
pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if ac.state&connectionSubscribeState != 0 {
pc.c.Send("UNSUBSCRIBE")
pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
pc.c.Send("ECHO", sentinel)
pc.c.Flush()
for {
p, err := pc.c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
ac.state &^= connectionSubscribeState
break
}
}
}
pc.c.Do("")
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}
func (ac *activeConn) Err() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Err()
}
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}
func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return cwt.DoWithTimeout(timeout, commandName, args...)
}
func (ac *activeConn) Send(commandName string, args ...interface{}) error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Send(commandName, args...)
}
func (ac *activeConn) Flush() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Flush()
}
func (ac *activeConn) Receive() (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
return pc.c.Receive()
}
func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.ReceiveWithTimeout(timeout)
}
type errorConn struct{ err error }
func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
return nil, ec.err
}
func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
func (ec errorConn) Err() error { return ec.err }
func (ec errorConn) Close() error { return nil }
func (ec errorConn) Flush() error { return ec.err }
func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
type idleList struct {
count int
front, back *poolConn
}
type poolConn struct {
c Conn
t time.Time
created time.Time
next, prev *poolConn
}
func (l *idleList) pushFront(pc *poolConn) {
pc.next = l.front
pc.prev = nil
if l.count == 0 {
l.back = pc
} else {
l.front.prev = pc
}
l.front = pc
l.count++
return
}
func (l *idleList) popFront() {
pc := l.front
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
pc.next.prev = nil
l.front = pc.next
}
pc.next, pc.prev = nil, nil
}
func (l *idleList) popBack() {
pc := l.back
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
pc.prev.next = nil
l.back = pc.prev
}
pc.next, pc.prev = nil, nil
}

View file

@ -16,11 +16,11 @@ package redis
import ( import (
"errors" "errors"
"time"
) )
// Subscription represents a subscribe or unsubscribe notification. // Subscription represents a subscribe or unsubscribe notification.
type Subscription struct { type Subscription struct {
// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" // Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
Kind string Kind string
@ -33,25 +33,19 @@ type Subscription struct {
// Message represents a message notification. // Message represents a message notification.
type Message struct { type Message struct {
// The originating channel. // The originating channel.
Channel string Channel string
// The matched pattern, if any
Pattern string
// The message data. // The message data.
Data []byte Data []byte
} }
// PMessage represents a pmessage notification. // Pong represents a pubsub pong notification.
type PMessage struct { type Pong struct {
Data string
// The matched pattern.
Pattern string
// The originating channel.
Channel string
// The message data.
Data []byte
} }
// PubSubConn wraps a Conn with convenience methods for subscribers. // PubSubConn wraps a Conn with convenience methods for subscribers.
@ -90,11 +84,30 @@ func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
return c.Conn.Flush() return c.Conn.Flush()
} }
// Receive returns a pushed message as a Subscription, Message, PMessage or // Ping sends a PING to the server with the specified data.
// error. The return value is intended to be used directly in a type switch as //
// The connection must be subscribed to at least one channel or pattern when
// calling this method.
func (c PubSubConn) Ping(data string) error {
c.Conn.Send("PING", data)
return c.Conn.Flush()
}
// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example. // illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} { func (c PubSubConn) Receive() interface{} {
reply, err := Values(c.Conn.Receive()) return c.receiveInternal(c.Conn.Receive())
}
// ReceiveWithTimeout is like Receive, but it allows the application to
// override the connection's default timeout.
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} {
return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout))
}
func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
reply, err := Values(replyArg, errArg)
if err != nil { if err != nil {
return err return err
} }
@ -113,17 +126,23 @@ func (c PubSubConn) Receive() interface{} {
} }
return m return m
case "pmessage": case "pmessage":
var pm PMessage var m Message
if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil { if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err return err
} }
return pm return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind} s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err return err
} }
return s return s
case "pong":
var p Pong
if _, err := Scan(reply, &p.Data); err != nil {
return err
}
return p
} }
return errors.New("redigo: unknown pubsub notification") return errors.New("redigo: unknown pubsub notification")
} }

138
vendor/github.com/gomodule/redigo/redis/redis.go generated vendored Normal file
View file

@ -0,0 +1,138 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"time"
)
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
// Argument is the interface implemented by an object which wants to control how
// the object is converted to Redis bulk strings.
type Argument interface {
// RedisArg returns a value to be encoded as a bulk string per the
// conversions listed in the section 'Executing Commands'.
// Implementations should typically return a []byte or string.
RedisArg() interface{}
}
// Scanner is implemented by an object which wants to control its value is
// interpreted when read from Redis.
type Scanner interface {
// RedisScan assigns a value from a Redis value. The argument src is one of
// the reply types listed in the section `Executing Commands`.
//
// An error should be returned if the value cannot be stored without
// loss of information.
RedisScan(src interface{}) error
}
// ConnWithTimeout is an optional interface that allows the caller to override
// a connection's default read timeout. This interface is useful for executing
// the BLPOP, BRPOP, BRPOPLPUSH, XREAD and other commands that block at the
// server.
//
// A connection's default read timeout is set with the DialReadTimeout dial
// option. Applications should rely on the default timeout for commands that do
// not block at the server.
//
// All of the Conn implementations in this package satisfy the ConnWithTimeout
// interface.
//
// Use the DoWithTimeout and ReceiveWithTimeout helper functions to simplify
// use of this interface.
type ConnWithTimeout interface {
Conn
// Do sends a command to the server and returns the received reply.
// The timeout overrides the read timeout set when dialing the
// connection.
DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error)
// Receive receives a single reply from the Redis server. The timeout
// overrides the read timeout set when dialing the connection.
ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error)
}
var errTimeoutNotSupported = errors.New("redis: connection does not support ConnWithTimeout")
// DoWithTimeout executes a Redis command with the specified read timeout. If
// the connection does not satisfy the ConnWithTimeout interface, then an error
// is returned.
func DoWithTimeout(c Conn, timeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
cwt, ok := c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.DoWithTimeout(timeout, cmd, args...)
}
// ReceiveWithTimeout receives a reply with the specified read timeout. If the
// connection does not satisfy the ConnWithTimeout interface, then an error is
// returned.
func ReceiveWithTimeout(c Conn, timeout time.Duration) (interface{}, error) {
cwt, ok := c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.ReceiveWithTimeout(timeout)
}
// SlowLog represents a redis SlowLog
type SlowLog struct {
// ID is a unique progressive identifier for every slow log entry.
ID int64
// Time is the unix timestamp at which the logged command was processed.
Time time.Time
// ExecutationTime is the amount of time needed for the command execution.
ExecutionTime time.Duration
// Args is the command name and arguments
Args []string
// ClientAddr is the client IP address (4.0 only).
ClientAddr string
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

583
vendor/github.com/gomodule/redigo/redis/reply.go generated vendored Normal file
View file

@ -0,0 +1,583 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"fmt"
"strconv"
"time"
)
// ErrNil indicates that a reply value is nil.
var ErrNil = errors.New("redigo: nil returned")
// Int is a helper that converts a command reply to an integer. If err is not
// equal to nil, then Int returns 0, err. Otherwise, Int converts the
// reply to an int as follows:
//
// Reply type Result
// integer int(reply), nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
x := int(reply)
if int64(x) != reply {
return 0, strconv.ErrRange
}
return x, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 0)
return int(n), err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply)
}
// Int64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int64 returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
return reply, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply)
}
func errNegativeInt(v int64) error {
return fmt.Errorf("redigo: unexpected negative value %v for Uint64", v)
}
// Uint64 is a helper that converts a command reply to 64 bit unsigned integer.
// If err is not equal to nil, then Uint64 returns 0, err. Otherwise, Uint64 converts the
// reply to an uint64 as follows:
//
// Reply type Result
// +integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Uint64(reply interface{}, err error) (uint64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
if reply < 0 {
return 0, errNegativeInt(reply)
}
return uint64(reply), nil
case []byte:
n, err := strconv.ParseUint(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Uint64, got type %T", reply)
}
// Float64 is a helper that converts a command reply to 64 bit float. If err is
// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts
// the reply to an int as follows:
//
// Reply type Result
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case []byte:
n, err := strconv.ParseFloat(string(reply), 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply)
}
// String is a helper that converts a command reply to a string. If err is not
// equal to nil, then String returns "", err. Otherwise String converts the
// reply to a string as follows:
//
// Reply type Result
// bulk string string(reply), nil
// simple string reply, nil
// nil "", ErrNil
// other "", error
func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}
switch reply := reply.(type) {
case []byte:
return string(reply), nil
case string:
return reply, nil
case nil:
return "", ErrNil
case Error:
return "", reply
}
return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply)
}
// Bytes is a helper that converts a command reply to a slice of bytes. If err
// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts
// the reply to a slice of bytes as follows:
//
// Reply type Result
// bulk string reply, nil
// simple string []byte(reply), nil
// nil nil, ErrNil
// other nil, error
func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
return reply, nil
case string:
return []byte(reply), nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply)
}
// Bool is a helper that converts a command reply to a boolean. If err is not
// equal to nil, then Bool returns false, err. Otherwise Bool converts the
// reply to boolean as follows:
//
// Reply type Result
// integer value != 0, nil
// bulk string strconv.ParseBool(reply)
// nil false, ErrNil
// other false, error
func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case int64:
return reply != 0, nil
case []byte:
return strconv.ParseBool(string(reply))
case nil:
return false, ErrNil
case Error:
return false, reply
}
return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply)
}
// MultiBulk is a helper that converts an array command reply to a []interface{}.
//
// Deprecated: Use Values instead.
func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) }
// Values is a helper that converts an array command reply to a []interface{}.
// If err is not equal to nil, then Values returns nil, err. Otherwise, Values
// converts the reply as follows:
//
// Reply type Result
// array reply, nil
// nil nil, ErrNil
// other nil, error
func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply)
}
func sliceHelper(reply interface{}, err error, name string, makeSlice func(int), assign func(int, interface{}) error) error {
if err != nil {
return err
}
switch reply := reply.(type) {
case []interface{}:
makeSlice(len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
if err := assign(i, reply[i]); err != nil {
return err
}
}
return nil
case nil:
return ErrNil
case Error:
return reply
}
return fmt.Errorf("redigo: unexpected type for %s, got type %T", name, reply)
}
// Float64s is a helper that converts an array command reply to a []float64. If
// err is not equal to nil, then Float64s returns nil, err. Nil array items are
// converted to 0 in the output slice. Floats64 returns an error if an array
// item is not a bulk string or nil.
func Float64s(reply interface{}, err error) ([]float64, error) {
var result []float64
err = sliceHelper(reply, err, "Float64s", func(n int) { result = make([]float64, n) }, func(i int, v interface{}) error {
p, ok := v.([]byte)
if !ok {
return fmt.Errorf("redigo: unexpected element type for Floats64, got type %T", v)
}
f, err := strconv.ParseFloat(string(p), 64)
result[i] = f
return err
})
return result, err
}
// Strings is a helper that converts an array command reply to a []string. If
// err is not equal to nil, then Strings returns nil, err. Nil array items are
// converted to "" in the output slice. Strings returns an error if an array
// item is not a bulk string or nil.
func Strings(reply interface{}, err error) ([]string, error) {
var result []string
err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case string:
result[i] = v
return nil
case []byte:
result[i] = string(v)
return nil
default:
return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v)
}
})
return result, err
}
// ByteSlices is a helper that converts an array command reply to a [][]byte.
// If err is not equal to nil, then ByteSlices returns nil, err. Nil array
// items are stay nil. ByteSlices returns an error if an array item is not a
// bulk string or nil.
func ByteSlices(reply interface{}, err error) ([][]byte, error) {
var result [][]byte
err = sliceHelper(reply, err, "ByteSlices", func(n int) { result = make([][]byte, n) }, func(i int, v interface{}) error {
p, ok := v.([]byte)
if !ok {
return fmt.Errorf("redigo: unexpected element type for ByteSlices, got type %T", v)
}
result[i] = p
return nil
})
return result, err
}
// Int64s is a helper that converts an array command reply to a []int64.
// If err is not equal to nil, then Int64s returns nil, err. Nil array
// items are stay nil. Int64s returns an error if an array item is not a
// bulk string or nil.
func Int64s(reply interface{}, err error) ([]int64, error) {
var result []int64
err = sliceHelper(reply, err, "Int64s", func(n int) { result = make([]int64, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case int64:
result[i] = v
return nil
case []byte:
n, err := strconv.ParseInt(string(v), 10, 64)
result[i] = n
return err
default:
return fmt.Errorf("redigo: unexpected element type for Int64s, got type %T", v)
}
})
return result, err
}
// Ints is a helper that converts an array command reply to a []in.
// If err is not equal to nil, then Ints returns nil, err. Nil array
// items are stay nil. Ints returns an error if an array item is not a
// bulk string or nil.
func Ints(reply interface{}, err error) ([]int, error) {
var result []int
err = sliceHelper(reply, err, "Ints", func(n int) { result = make([]int, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case int64:
n := int(v)
if int64(n) != v {
return strconv.ErrRange
}
result[i] = n
return nil
case []byte:
n, err := strconv.Atoi(string(v))
result[i] = n
return err
default:
return fmt.Errorf("redigo: unexpected element type for Ints, got type %T", v)
}
})
return result, err
}
// StringMap is a helper that converts an array of strings (alternating key, value)
// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format.
// Requires an even number of values in result.
func StringMap(result interface{}, err error) (map[string]string, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, errors.New("redigo: StringMap expects even number of values result")
}
m := make(map[string]string, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, okKey := values[i].([]byte)
value, okValue := values[i+1].([]byte)
if !okKey || !okValue {
return nil, errors.New("redigo: StringMap key not a bulk string value")
}
m[string(key)] = string(value)
}
return m, nil
}
// IntMap is a helper that converts an array of strings (alternating key, value)
// into a map[string]int. The HGETALL commands return replies in this format.
// Requires an even number of values in result.
func IntMap(result interface{}, err error) (map[string]int, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, errors.New("redigo: IntMap expects even number of values result")
}
m := make(map[string]int, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte)
if !ok {
return nil, errors.New("redigo: IntMap key not a bulk string value")
}
value, err := Int(values[i+1], nil)
if err != nil {
return nil, err
}
m[string(key)] = value
}
return m, nil
}
// Int64Map is a helper that converts an array of strings (alternating key, value)
// into a map[string]int64. The HGETALL commands return replies in this format.
// Requires an even number of values in result.
func Int64Map(result interface{}, err error) (map[string]int64, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, errors.New("redigo: Int64Map expects even number of values result")
}
m := make(map[string]int64, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte)
if !ok {
return nil, errors.New("redigo: Int64Map key not a bulk string value")
}
value, err := Int64(values[i+1], nil)
if err != nil {
return nil, err
}
m[string(key)] = value
}
return m, nil
}
// Positions is a helper that converts an array of positions (lat, long)
// into a [][2]float64. The GEOPOS command returns replies in this format.
func Positions(result interface{}, err error) ([]*[2]float64, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
positions := make([]*[2]float64, len(values))
for i := range values {
if values[i] == nil {
continue
}
p, ok := values[i].([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: unexpected element type for interface slice, got type %T", values[i])
}
if len(p) != 2 {
return nil, fmt.Errorf("redigo: unexpected number of values for a member position, got %d", len(p))
}
lat, err := Float64(p[0], nil)
if err != nil {
return nil, err
}
long, err := Float64(p[1], nil)
if err != nil {
return nil, err
}
positions[i] = &[2]float64{lat, long}
}
return positions, nil
}
// Uint64s is a helper that converts an array command reply to a []uint64.
// If err is not equal to nil, then Uint64s returns nil, err. Nil array
// items are stay nil. Uint64s returns an error if an array item is not a
// bulk string or nil.
func Uint64s(reply interface{}, err error) ([]uint64, error) {
var result []uint64
err = sliceHelper(reply, err, "Uint64s", func(n int) { result = make([]uint64, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case uint64:
result[i] = v
return nil
case []byte:
n, err := strconv.ParseUint(string(v), 10, 64)
result[i] = n
return err
default:
return fmt.Errorf("redigo: unexpected element type for Uint64s, got type %T", v)
}
})
return result, err
}
// Uint64Map is a helper that converts an array of strings (alternating key, value)
// into a map[string]uint64. The HGETALL commands return replies in this format.
// Requires an even number of values in result.
func Uint64Map(result interface{}, err error) (map[string]uint64, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, errors.New("redigo: Uint64Map expects even number of values result")
}
m := make(map[string]uint64, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte)
if !ok {
return nil, errors.New("redigo: Uint64Map key not a bulk string value")
}
value, err := Uint64(values[i+1], nil)
if err != nil {
return nil, err
}
m[string(key)] = value
}
return m, nil
}
// SlowLogs is a helper that parse the SLOWLOG GET command output and
// return the array of SlowLog
func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
rawLogs, err := Values(result, err)
if err != nil {
return nil, err
}
logs := make([]SlowLog, len(rawLogs))
for i, rawLog := range rawLogs {
rawLog, ok := rawLog.([]interface{})
if !ok {
return nil, errors.New("redigo: slowlog element is not an array")
}
var log SlowLog
if len(rawLog) < 4 {
return nil, errors.New("redigo: slowlog element has less than four elements")
}
log.ID, ok = rawLog[0].(int64)
if !ok {
return nil, errors.New("redigo: slowlog element[0] not an int64")
}
timestamp, ok := rawLog[1].(int64)
if !ok {
return nil, errors.New("redigo: slowlog element[1] not an int64")
}
log.Time = time.Unix(timestamp, 0)
duration, ok := rawLog[2].(int64)
if !ok {
return nil, errors.New("redigo: slowlog element[2] not an int64")
}
log.ExecutionTime = time.Duration(duration) * time.Microsecond
log.Args, err = Strings(rawLog[3], nil)
if err != nil {
return nil, fmt.Errorf("redigo: slowlog element[3] is not array of string. actual error is : %s", err.Error())
}
if len(rawLog) >= 6 {
log.ClientAddr, err = String(rawLog[4], nil)
if err != nil {
return nil, fmt.Errorf("redigo: slowlog element[4] is not a string. actual error is : %s", err.Error())
}
log.ClientName, err = String(rawLog[5], nil)
if err != nil {
return nil, fmt.Errorf("redigo: slowlog element[5] is not a string. actual error is : %s", err.Error())
}
}
logs[i] = log
}
return logs, nil
}

View file

@ -23,6 +23,10 @@ import (
"sync" "sync"
) )
var (
scannerType = reflect.TypeOf((*Scanner)(nil)).Elem()
)
func ensureLen(d reflect.Value, n int) { func ensureLen(d reflect.Value, n int) {
if n > d.Cap() { if n > d.Cap() {
d.Set(reflect.MakeSlice(d.Type(), n, n)) d.Set(reflect.MakeSlice(d.Type(), n, n))
@ -32,42 +36,117 @@ func ensureLen(d reflect.Value, n int) {
} }
func cannotConvert(d reflect.Value, s interface{}) error { func cannotConvert(d reflect.Value, s interface{}) error {
return fmt.Errorf("redigo: Scan cannot convert from %s to %s", var sname string
reflect.TypeOf(s), d.Type()) switch s.(type) {
case string:
sname = "Redis simple string"
case Error:
sname = "Redis error"
case int64:
sname = "Redis integer"
case []byte:
sname = "Redis bulk string"
case []interface{}:
sname = "Redis array"
case nil:
sname = "Redis nil"
default:
sname = reflect.TypeOf(s).String()
}
return fmt.Errorf("cannot convert from %s to %s", sname, d.Type())
} }
func convertAssignBytes(d reflect.Value, s []byte) (err error) { func convertAssignNil(d reflect.Value) (err error) {
switch d.Type().Kind() {
case reflect.Slice, reflect.Interface:
d.Set(reflect.Zero(d.Type()))
default:
err = cannotConvert(d, nil)
}
return err
}
func convertAssignError(d reflect.Value, s Error) (err error) {
if d.Kind() == reflect.String {
d.SetString(string(s))
} else if d.Kind() == reflect.Slice && d.Type().Elem().Kind() == reflect.Uint8 {
d.SetBytes([]byte(s))
} else {
err = cannotConvert(d, s)
}
return
}
func convertAssignString(d reflect.Value, s string) (err error) {
switch d.Type().Kind() { switch d.Type().Kind() {
case reflect.Float32, reflect.Float64: case reflect.Float32, reflect.Float64:
var x float64 var x float64
x, err = strconv.ParseFloat(string(s), d.Type().Bits()) x, err = strconv.ParseFloat(s, d.Type().Bits())
d.SetFloat(x) d.SetFloat(x)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
var x int64 var x int64
x, err = strconv.ParseInt(string(s), 10, d.Type().Bits()) x, err = strconv.ParseInt(s, 10, d.Type().Bits())
d.SetInt(x) d.SetInt(x)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var x uint64 var x uint64
x, err = strconv.ParseUint(string(s), 10, d.Type().Bits()) x, err = strconv.ParseUint(s, 10, d.Type().Bits())
d.SetUint(x) d.SetUint(x)
case reflect.Bool: case reflect.Bool:
var x bool var x bool
x, err = strconv.ParseBool(string(s)) x, err = strconv.ParseBool(s)
d.SetBool(x) d.SetBool(x)
case reflect.String: case reflect.String:
d.SetString(string(s)) d.SetString(s)
case reflect.Slice: case reflect.Slice:
if d.Type().Elem().Kind() != reflect.Uint8 { if d.Type().Elem().Kind() == reflect.Uint8 {
err = cannotConvert(d, s) d.SetBytes([]byte(s))
} else { } else {
d.SetBytes(s) err = cannotConvert(d, s)
} }
case reflect.Ptr:
err = convertAssignString(d.Elem(), s)
default: default:
err = cannotConvert(d, s) err = cannotConvert(d, s)
} }
return return
} }
func convertAssignBulkString(d reflect.Value, s []byte) (err error) {
switch d.Type().Kind() {
case reflect.Slice:
// Handle []byte destination here to avoid unnecessary
// []byte -> string -> []byte converion.
if d.Type().Elem().Kind() == reflect.Uint8 {
d.SetBytes(s)
} else {
err = cannotConvert(d, s)
}
case reflect.Ptr:
if d.CanInterface() && d.CanSet() {
if s == nil {
if d.IsNil() {
return nil
}
d.Set(reflect.Zero(d.Type()))
return nil
}
if d.IsNil() {
d.Set(reflect.New(d.Type().Elem()))
}
if sc, ok := d.Interface().(Scanner); ok {
return sc.RedisScan(s)
}
}
err = convertAssignString(d, string(s))
default:
err = convertAssignString(d, string(s))
}
return err
}
func convertAssignInt(d reflect.Value, s int64) (err error) { func convertAssignInt(d reflect.Value, s int64) (err error) {
switch d.Type().Kind() { switch d.Type().Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
@ -96,18 +175,43 @@ func convertAssignInt(d reflect.Value, s int64) (err error) {
} }
func convertAssignValue(d reflect.Value, s interface{}) (err error) { func convertAssignValue(d reflect.Value, s interface{}) (err error) {
if d.Kind() != reflect.Ptr {
if d.CanAddr() {
d2 := d.Addr()
if d2.CanInterface() {
if scanner, ok := d2.Interface().(Scanner); ok {
return scanner.RedisScan(s)
}
}
}
} else if d.CanInterface() {
// Already a reflect.Ptr
if d.IsNil() {
d.Set(reflect.New(d.Type().Elem()))
}
if scanner, ok := d.Interface().(Scanner); ok {
return scanner.RedisScan(s)
}
}
switch s := s.(type) { switch s := s.(type) {
case nil:
err = convertAssignNil(d)
case []byte: case []byte:
err = convertAssignBytes(d, s) err = convertAssignBulkString(d, s)
case int64: case int64:
err = convertAssignInt(d, s) err = convertAssignInt(d, s)
case string:
err = convertAssignString(d, s)
case Error:
err = convertAssignError(d, s)
default: default:
err = cannotConvert(d, s) err = cannotConvert(d, s)
} }
return err return err
} }
func convertAssignValues(d reflect.Value, s []interface{}) error { func convertAssignArray(d reflect.Value, s []interface{}) error {
if d.Type().Kind() != reflect.Slice { if d.Type().Kind() != reflect.Slice {
return cannotConvert(d, s) return cannotConvert(d, s)
} }
@ -121,11 +225,15 @@ func convertAssignValues(d reflect.Value, s []interface{}) error {
} }
func convertAssign(d interface{}, s interface{}) (err error) { func convertAssign(d interface{}, s interface{}) (err error) {
if scanner, ok := d.(Scanner); ok {
return scanner.RedisScan(s)
}
// Handle the most common destination types using type switches and // Handle the most common destination types using type switches and
// fall back to reflection for all other types. // fall back to reflection for all other types.
switch s := s.(type) { switch s := s.(type) {
case nil: case nil:
// ingore // ignore
case []byte: case []byte:
switch d := d.(type) { switch d := d.(type) {
case *string: case *string:
@ -144,7 +252,7 @@ func convertAssign(d interface{}, s interface{}) (err error) {
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s) err = cannotConvert(d, s)
} else { } else {
err = convertAssignBytes(d.Elem(), s) err = convertAssignBulkString(d.Elem(), s)
} }
} }
case int64: case int64:
@ -169,6 +277,17 @@ func convertAssign(d interface{}, s interface{}) (err error) {
err = convertAssignInt(d.Elem(), s) err = convertAssignInt(d.Elem(), s)
} }
} }
case string:
switch d := d.(type) {
case *string:
*d = s
case *interface{}:
*d = s
case nil:
// skip value
default:
err = cannotConvert(reflect.ValueOf(d), s)
}
case []interface{}: case []interface{}:
switch d := d.(type) { switch d := d.(type) {
case *[]interface{}: case *[]interface{}:
@ -181,7 +300,7 @@ func convertAssign(d interface{}, s interface{}) (err error) {
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s) err = cannotConvert(d, s)
} else { } else {
err = convertAssignValues(d.Elem(), s) err = convertAssignArray(d.Elem(), s)
} }
} }
case Error: case Error:
@ -194,6 +313,8 @@ func convertAssign(d interface{}, s interface{}) (err error) {
// Scan copies from src to the values pointed at by dest. // Scan copies from src to the values pointed at by dest.
// //
// Scan uses RedisScan if available otherwise:
//
// The values pointed at by dest must be an integer, float, boolean, string, // The values pointed at by dest must be an integer, float, boolean, string,
// []byte, interface{} or slices of these types. Scan uses the standard strconv // []byte, interface{} or slices of these types. Scan uses the standard strconv
// package to convert bulk strings to numeric and boolean types. // package to convert bulk strings to numeric and boolean types.
@ -206,12 +327,13 @@ func convertAssign(d interface{}, s interface{}) (err error) {
// following the copied values. // following the copied values.
func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) { func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) {
if len(src) < len(dest) { if len(src) < len(dest) {
return nil, errors.New("redigo: Scan array short") return nil, errors.New("redigo.Scan: array short")
} }
var err error var err error
for i, d := range dest { for i, d := range dest {
err = convertAssign(d, src[i]) err = convertAssign(d, src[i])
if err != nil { if err != nil {
err = fmt.Errorf("redigo.Scan: cannot assign to dest %d: %v", i, err)
break break
} }
} }
@ -219,9 +341,9 @@ func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) {
} }
type fieldSpec struct { type fieldSpec struct {
name string name string
index []int index []int
//omitEmpty bool omitEmpty bool
} }
type structSpec struct { type structSpec struct {
@ -237,13 +359,17 @@ func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *st
for i := 0; i < t.NumField(); i++ { for i := 0; i < t.NumField(); i++ {
f := t.Field(i) f := t.Field(i)
switch { switch {
case f.PkgPath != "": case f.PkgPath != "" && !f.Anonymous:
// Ignore unexported fields. // Ignore unexported fields.
case f.Anonymous: case f.Anonymous:
// TODO: Handle pointers. Requires change to decoder and switch f.Type.Kind() {
// protection against infinite recursion. case reflect.Struct:
if f.Type.Kind() == reflect.Struct {
compileStructSpec(f.Type, depth, append(index, i), ss) compileStructSpec(f.Type, depth, append(index, i), ss)
case reflect.Ptr:
// TODO(steve): Protect against infinite recursion.
if f.Type.Elem().Kind() == reflect.Struct {
compileStructSpec(f.Type.Elem(), depth, append(index, i), ss)
}
} }
default: default:
fs := &fieldSpec{name: f.Name} fs := &fieldSpec{name: f.Name}
@ -258,10 +384,10 @@ func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *st
} }
for _, s := range p[1:] { for _, s := range p[1:] {
switch s { switch s {
//case "omitempty": case "omitempty":
// fs.omitempty = true fs.omitEmpty = true
default: default:
panic(errors.New("redigo: unknown field flag " + s + " for type " + t.Name())) panic(fmt.Errorf("redigo: unknown field tag %s for type %s", s, t.Name()))
} }
} }
} }
@ -321,7 +447,7 @@ func structSpecForType(t reflect.Type) *structSpec {
return ss return ss
} }
var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil pointer to a struct") var errScanStructValue = errors.New("redigo.ScanStruct: value must be non-nil pointer to a struct")
// ScanStruct scans alternating names and values from src to a struct. The // ScanStruct scans alternating names and values from src to a struct. The
// HGETALL and CONFIG GET commands return replies in this format. // HGETALL and CONFIG GET commands return replies in this format.
@ -333,6 +459,7 @@ var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil po
// //
// Fields with the tag redis:"-" are ignored. // Fields with the tag redis:"-" are ignored.
// //
// Each field uses RedisScan if available otherwise:
// Integer, float, boolean, string and []byte fields are supported. Scan uses the // Integer, float, boolean, string and []byte fields are supported. Scan uses the
// standard strconv package to convert bulk string values to numeric and // standard strconv package to convert bulk string values to numeric and
// boolean types. // boolean types.
@ -350,7 +477,7 @@ func ScanStruct(src []interface{}, dest interface{}) error {
ss := structSpecForType(d.Type()) ss := structSpecForType(d.Type())
if len(src)%2 != 0 { if len(src)%2 != 0 {
return errors.New("redigo: ScanStruct expects even number of values in values") return errors.New("redigo.ScanStruct: number of values not a multiple of 2")
} }
for i := 0; i < len(src); i += 2 { for i := 0; i < len(src); i += 2 {
@ -360,26 +487,30 @@ func ScanStruct(src []interface{}, dest interface{}) error {
} }
name, ok := src[i].([]byte) name, ok := src[i].([]byte)
if !ok { if !ok {
return errors.New("redigo: ScanStruct key not a bulk string value") return fmt.Errorf("redigo.ScanStruct: key %d not a bulk string value", i)
} }
fs := ss.fieldSpec(name) fs := ss.fieldSpec(name)
if fs == nil { if fs == nil {
continue continue
} }
if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil {
return err return fmt.Errorf("redigo.ScanStruct: cannot assign field %s: %v", fs.name, err)
} }
} }
return nil return nil
} }
var ( var (
errScanSliceValue = errors.New("redigo: ScanSlice dest must be non-nil pointer to a struct") errScanSliceValue = errors.New("redigo.ScanSlice: dest must be non-nil pointer to a struct")
) )
// ScanSlice scans src to the slice pointed to by dest. The elements the dest // ScanSlice scans src to the slice pointed to by dest.
// slice must be integer, float, boolean, string, struct or pointer to struct //
// values. // If the target is a slice of types which implement Scanner then the custom
// RedisScan method is used otherwise the following rules apply:
//
// The elements in the dest slice must be integer, float, boolean, string, struct
// or pointer to struct values.
// //
// Struct fields must be integer, float, boolean or string values. All struct // Struct fields must be integer, float, boolean or string values. All struct
// fields are used unless a subset is specified using fieldNames. // fields are used unless a subset is specified using fieldNames.
@ -395,19 +526,20 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error
isPtr := false isPtr := false
t := d.Type().Elem() t := d.Type().Elem()
st := t
if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct { if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct {
isPtr = true isPtr = true
t = t.Elem() t = t.Elem()
} }
if t.Kind() != reflect.Struct { if t.Kind() != reflect.Struct || st.Implements(scannerType) {
ensureLen(d, len(src)) ensureLen(d, len(src))
for i, s := range src { for i, s := range src {
if s == nil { if s == nil {
continue continue
} }
if err := convertAssignValue(d.Index(i), s); err != nil { if err := convertAssignValue(d.Index(i), s); err != nil {
return err return fmt.Errorf("redigo.ScanSlice: cannot assign element %d: %v", i, err)
} }
} }
return nil return nil
@ -420,18 +552,18 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error
for i, name := range fieldNames { for i, name := range fieldNames {
fss[i] = ss.m[name] fss[i] = ss.m[name]
if fss[i] == nil { if fss[i] == nil {
return errors.New("redigo: ScanSlice bad field name " + name) return fmt.Errorf("redigo.ScanSlice: ScanSlice bad field name %s", name)
} }
} }
} }
if len(fss) == 0 { if len(fss) == 0 {
return errors.New("redigo: ScanSlice no struct fields") return errors.New("redigo.ScanSlice: no struct fields")
} }
n := len(src) / len(fss) n := len(src) / len(fss)
if n*len(fss) != len(src) { if n*len(fss) != len(src) {
return errors.New("redigo: ScanSlice length not a multiple of struct field count") return errors.New("redigo.ScanSlice: length not a multiple of struct field count")
} }
ensureLen(d, n) ensureLen(d, n)
@ -449,7 +581,7 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error
continue continue
} }
if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil {
return err return fmt.Errorf("redigo.ScanSlice: cannot assign element %d to field %s: %v", i*len(fss)+j, fs.name, err)
} }
} }
} }
@ -507,7 +639,35 @@ func flattenStruct(args Args, v reflect.Value) Args {
ss := structSpecForType(v.Type()) ss := structSpecForType(v.Type())
for _, fs := range ss.l { for _, fs := range ss.l {
fv := v.FieldByIndex(fs.index) fv := v.FieldByIndex(fs.index)
args = append(args, fs.name, fv.Interface()) if fs.omitEmpty {
var empty = false
switch fv.Kind() {
case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
empty = fv.Len() == 0
case reflect.Bool:
empty = !fv.Bool()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
empty = fv.Int() == 0
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
empty = fv.Uint() == 0
case reflect.Float32, reflect.Float64:
empty = fv.Float() == 0
case reflect.Interface, reflect.Ptr:
empty = fv.IsNil()
}
if empty {
continue
}
}
if arg, ok := fv.Interface().(Argument); ok {
args = append(args, fs.name, arg.RedisArg())
} else if fv.Kind() == reflect.Ptr {
if !fv.IsNil() {
args = append(args, fs.name, fv.Elem().Interface())
}
} else {
args = append(args, fs.name, fv.Interface())
}
} }
return args return args
} }

View file

@ -55,6 +55,11 @@ func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} {
return args return args
} }
// Hash returns the script hash.
func (s *Script) Hash() string {
return s.hash
}
// Do evaluates the script. Under the covers, Do optimistically evaluates the // Do evaluates the script. Under the covers, Do optimistically evaluates the
// script using the EVALSHA command. If the command fails because the script is // script using the EVALSHA command. If the command fails because the script is
// not loaded, then Do evaluates the script using the EVAL command (thus // not loaded, then Do evaluates the script using the EVAL command (thus

5
vendor/modules.txt vendored
View file

@ -79,11 +79,10 @@ github.com/docker/go-events
github.com/docker/go-metrics github.com/docker/go-metrics
# github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 # github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1
github.com/docker/libtrust github.com/docker/libtrust
# github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7
github.com/garyburd/redigo/internal
github.com/garyburd/redigo/redis
# github.com/golang/protobuf v1.3.2 # github.com/golang/protobuf v1.3.2
github.com/golang/protobuf/proto github.com/golang/protobuf/proto
# github.com/gomodule/redigo v1.8.2
github.com/gomodule/redigo/redis
# github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 # github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33
github.com/gorilla/handlers github.com/gorilla/handlers
# github.com/gorilla/mux v1.7.2 # github.com/gorilla/mux v1.7.2