diff --git a/go.mod b/go.mod index 5d3a1c14d..abcc6f397 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c github.com/docker/go-metrics v0.0.1 github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 - github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 + github.com/gomodule/redigo v1.8.2 github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 github.com/gorilla/mux v1.7.2 github.com/inconshreveable/mousetrap v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3df4dc12b..c2db8fc7e 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,6 @@ github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQ github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 h1:ZClxb8laGDf5arXfYcAtECDFgAgHklGI8CxgjHnXKJ4= github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= -github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 h1:LofdAjjjqCSXMwLGgOgnE+rdPuvX9DxCqaHwKy7i/ko= -github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= @@ -53,6 +51,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 h1:893HsJqtxp9z1SF76gg6hY70hRY1wVlTSnC/h1yUDCo= diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 74682c344..f2be2b0d2 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -39,7 +39,7 @@ import ( events "github.com/docker/go-events" "github.com/docker/go-metrics" "github.com/docker/libtrust" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/gorilla/mux" "github.com/sirupsen/logrus" ) @@ -514,11 +514,11 @@ func (app *App) configureRedis(configuration *configuration.Configuration) { } } - conn, err := redis.DialTimeout("tcp", + conn, err := redis.Dial("tcp", configuration.Redis.Addr, - configuration.Redis.DialTimeout, - configuration.Redis.ReadTimeout, - configuration.Redis.WriteTimeout) + redis.DialConnectTimeout(configuration.Redis.DialTimeout), + redis.DialReadTimeout(configuration.Redis.ReadTimeout), + redis.DialWriteTimeout(configuration.Redis.WriteTimeout)) if err != nil { dcontext.GetLogger(app).Errorf("error connecting to redis instance %s: %v", configuration.Redis.Addr, err) diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index ed03f9f0b..5f84249bb 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -8,7 +8,7 @@ import ( "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/cache/metrics" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/opencontainers/go-digest" ) diff --git a/registry/storage/cache/redis/redis_test.go b/registry/storage/cache/redis/redis_test.go index 03cc85f1e..5f163a747 100644 --- a/registry/storage/cache/redis/redis_test.go +++ b/registry/storage/cache/redis/redis_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/docker/distribution/registry/storage/cache/cachecheck" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) var redisAddr string diff --git a/vendor/github.com/garyburd/redigo/internal/commandinfo.go b/vendor/github.com/garyburd/redigo/internal/commandinfo.go deleted file mode 100644 index 7ad1ade57..000000000 --- a/vendor/github.com/garyburd/redigo/internal/commandinfo.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2014 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package internal // import "github.com/garyburd/redigo/internal" - -import ( - "strings" -) - -const ( - WatchState = 1 << iota - MultiState - SubscribeState - MonitorState -) - -type CommandInfo struct { - Set, Clear int -} - -var commandInfos = map[string]CommandInfo{ - "WATCH": {Set: WatchState}, - "UNWATCH": {Clear: WatchState}, - "MULTI": {Set: MultiState}, - "EXEC": {Clear: WatchState | MultiState}, - "DISCARD": {Clear: WatchState | MultiState}, - "PSUBSCRIBE": {Set: SubscribeState}, - "SUBSCRIBE": {Set: SubscribeState}, - "MONITOR": {Set: MonitorState}, -} - -func LookupCommandInfo(commandName string) CommandInfo { - return commandInfos[strings.ToUpper(commandName)] -} diff --git a/vendor/github.com/garyburd/redigo/redis/conn.go b/vendor/github.com/garyburd/redigo/redis/conn.go deleted file mode 100644 index ac0e971c4..000000000 --- a/vendor/github.com/garyburd/redigo/redis/conn.go +++ /dev/null @@ -1,455 +0,0 @@ -// Copyright 2012 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redis - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "net" - "strconv" - "sync" - "time" -) - -// conn is the low-level implementation of Conn -type conn struct { - - // Shared - mu sync.Mutex - pending int - err error - conn net.Conn - - // Read - readTimeout time.Duration - br *bufio.Reader - - // Write - writeTimeout time.Duration - bw *bufio.Writer - - // Scratch space for formatting argument length. - // '*' or '$', length, "\r\n" - lenScratch [32]byte - - // Scratch space for formatting integers and floats. - numScratch [40]byte -} - -// Dial connects to the Redis server at the given network and address. -func Dial(network, address string) (Conn, error) { - dialer := xDialer{} - return dialer.Dial(network, address) -} - -// DialTimeout acts like Dial but takes timeouts for establishing the -// connection to the server, writing a command and reading a reply. -func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) { - netDialer := net.Dialer{Timeout: connectTimeout} - dialer := xDialer{ - NetDial: netDialer.Dial, - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, - } - return dialer.Dial(network, address) -} - -// A Dialer specifies options for connecting to a Redis server. -type xDialer struct { - // NetDial specifies the dial function for creating TCP connections. If - // NetDial is nil, then net.Dial is used. - NetDial func(network, addr string) (net.Conn, error) - - // ReadTimeout specifies the timeout for reading a single command - // reply. If ReadTimeout is zero, then no timeout is used. - ReadTimeout time.Duration - - // WriteTimeout specifies the timeout for writing a single command. If - // WriteTimeout is zero, then no timeout is used. - WriteTimeout time.Duration -} - -// Dial connects to the Redis server at address on the named network. -func (d *xDialer) Dial(network, address string) (Conn, error) { - dial := d.NetDial - if dial == nil { - dial = net.Dial - } - netConn, err := dial(network, address) - if err != nil { - return nil, err - } - return &conn{ - conn: netConn, - bw: bufio.NewWriter(netConn), - br: bufio.NewReader(netConn), - readTimeout: d.ReadTimeout, - writeTimeout: d.WriteTimeout, - }, nil -} - -// NewConn returns a new Redigo connection for the given net connection. -func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn { - return &conn{ - conn: netConn, - bw: bufio.NewWriter(netConn), - br: bufio.NewReader(netConn), - readTimeout: readTimeout, - writeTimeout: writeTimeout, - } -} - -func (c *conn) Close() error { - c.mu.Lock() - err := c.err - if c.err == nil { - c.err = errors.New("redigo: closed") - err = c.conn.Close() - } - c.mu.Unlock() - return err -} - -func (c *conn) fatal(err error) error { - c.mu.Lock() - if c.err == nil { - c.err = err - // Close connection to force errors on subsequent calls and to unblock - // other reader or writer. - c.conn.Close() - } - c.mu.Unlock() - return err -} - -func (c *conn) Err() error { - c.mu.Lock() - err := c.err - c.mu.Unlock() - return err -} - -func (c *conn) writeLen(prefix byte, n int) error { - c.lenScratch[len(c.lenScratch)-1] = '\n' - c.lenScratch[len(c.lenScratch)-2] = '\r' - i := len(c.lenScratch) - 3 - for { - c.lenScratch[i] = byte('0' + n%10) - i -= 1 - n = n / 10 - if n == 0 { - break - } - } - c.lenScratch[i] = prefix - _, err := c.bw.Write(c.lenScratch[i:]) - return err -} - -func (c *conn) writeString(s string) error { - c.writeLen('$', len(s)) - c.bw.WriteString(s) - _, err := c.bw.WriteString("\r\n") - return err -} - -func (c *conn) writeBytes(p []byte) error { - c.writeLen('$', len(p)) - c.bw.Write(p) - _, err := c.bw.WriteString("\r\n") - return err -} - -func (c *conn) writeInt64(n int64) error { - return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10)) -} - -func (c *conn) writeFloat64(n float64) error { - return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)) -} - -func (c *conn) writeCommand(cmd string, args []interface{}) (err error) { - c.writeLen('*', 1+len(args)) - err = c.writeString(cmd) - for _, arg := range args { - if err != nil { - break - } - switch arg := arg.(type) { - case string: - err = c.writeString(arg) - case []byte: - err = c.writeBytes(arg) - case int: - err = c.writeInt64(int64(arg)) - case int64: - err = c.writeInt64(arg) - case float64: - err = c.writeFloat64(arg) - case bool: - if arg { - err = c.writeString("1") - } else { - err = c.writeString("0") - } - case nil: - err = c.writeString("") - default: - var buf bytes.Buffer - fmt.Fprint(&buf, arg) - err = c.writeBytes(buf.Bytes()) - } - } - return err -} - -type protocolError string - -func (pe protocolError) Error() string { - return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe)) -} - -func (c *conn) readLine() ([]byte, error) { - p, err := c.br.ReadSlice('\n') - if err == bufio.ErrBufferFull { - return nil, protocolError("long response line") - } - if err != nil { - return nil, err - } - i := len(p) - 2 - if i < 0 || p[i] != '\r' { - return nil, protocolError("bad response line terminator") - } - return p[:i], nil -} - -// parseLen parses bulk string and array lengths. -func parseLen(p []byte) (int, error) { - if len(p) == 0 { - return -1, protocolError("malformed length") - } - - if p[0] == '-' && len(p) == 2 && p[1] == '1' { - // handle $-1 and $-1 null replies. - return -1, nil - } - - var n int - for _, b := range p { - n *= 10 - if b < '0' || b > '9' { - return -1, protocolError("illegal bytes in length") - } - n += int(b - '0') - } - - return n, nil -} - -// parseInt parses an integer reply. -func parseInt(p []byte) (interface{}, error) { - if len(p) == 0 { - return 0, protocolError("malformed integer") - } - - var negate bool - if p[0] == '-' { - negate = true - p = p[1:] - if len(p) == 0 { - return 0, protocolError("malformed integer") - } - } - - var n int64 - for _, b := range p { - n *= 10 - if b < '0' || b > '9' { - return 0, protocolError("illegal bytes in length") - } - n += int64(b - '0') - } - - if negate { - n = -n - } - return n, nil -} - -var ( - okReply interface{} = "OK" - pongReply interface{} = "PONG" -) - -func (c *conn) readReply() (interface{}, error) { - line, err := c.readLine() - if err != nil { - return nil, err - } - if len(line) == 0 { - return nil, protocolError("short response line") - } - switch line[0] { - case '+': - switch { - case len(line) == 3 && line[1] == 'O' && line[2] == 'K': - // Avoid allocation for frequent "+OK" response. - return okReply, nil - case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G': - // Avoid allocation in PING command benchmarks :) - return pongReply, nil - default: - return string(line[1:]), nil - } - case '-': - return Error(string(line[1:])), nil - case ':': - return parseInt(line[1:]) - case '$': - n, err := parseLen(line[1:]) - if n < 0 || err != nil { - return nil, err - } - p := make([]byte, n) - _, err = io.ReadFull(c.br, p) - if err != nil { - return nil, err - } - if line, err := c.readLine(); err != nil { - return nil, err - } else if len(line) != 0 { - return nil, protocolError("bad bulk string format") - } - return p, nil - case '*': - n, err := parseLen(line[1:]) - if n < 0 || err != nil { - return nil, err - } - r := make([]interface{}, n) - for i := range r { - r[i], err = c.readReply() - if err != nil { - return nil, err - } - } - return r, nil - } - return nil, protocolError("unexpected response line") -} - -func (c *conn) Send(cmd string, args ...interface{}) error { - c.mu.Lock() - c.pending += 1 - c.mu.Unlock() - if c.writeTimeout != 0 { - c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) - } - if err := c.writeCommand(cmd, args); err != nil { - return c.fatal(err) - } - return nil -} - -func (c *conn) Flush() error { - if c.writeTimeout != 0 { - c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) - } - if err := c.bw.Flush(); err != nil { - return c.fatal(err) - } - return nil -} - -func (c *conn) Receive() (reply interface{}, err error) { - if c.readTimeout != 0 { - c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) - } - if reply, err = c.readReply(); err != nil { - return nil, c.fatal(err) - } - // When using pub/sub, the number of receives can be greater than the - // number of sends. To enable normal use of the connection after - // unsubscribing from all channels, we do not decrement pending to a - // negative value. - // - // The pending field is decremented after the reply is read to handle the - // case where Receive is called before Send. - c.mu.Lock() - if c.pending > 0 { - c.pending -= 1 - } - c.mu.Unlock() - if err, ok := reply.(Error); ok { - return nil, err - } - return -} - -func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { - c.mu.Lock() - pending := c.pending - c.pending = 0 - c.mu.Unlock() - - if cmd == "" && pending == 0 { - return nil, nil - } - - if c.writeTimeout != 0 { - c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) - } - - if cmd != "" { - c.writeCommand(cmd, args) - } - - if err := c.bw.Flush(); err != nil { - return nil, c.fatal(err) - } - - if c.readTimeout != 0 { - c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) - } - - if cmd == "" { - reply := make([]interface{}, pending) - for i := range reply { - r, e := c.readReply() - if e != nil { - return nil, c.fatal(e) - } - reply[i] = r - } - return reply, nil - } - - var err error - var reply interface{} - for i := 0; i <= pending; i++ { - var e error - if reply, e = c.readReply(); e != nil { - return nil, c.fatal(e) - } - if e, ok := reply.(Error); ok && err == nil { - err = e - } - } - return reply, err -} diff --git a/vendor/github.com/garyburd/redigo/redis/pool.go b/vendor/github.com/garyburd/redigo/redis/pool.go deleted file mode 100644 index 9daf2e33f..000000000 --- a/vendor/github.com/garyburd/redigo/redis/pool.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2012 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redis - -import ( - "bytes" - "container/list" - "crypto/rand" - "crypto/sha1" - "errors" - "io" - "strconv" - "sync" - "time" - - "github.com/garyburd/redigo/internal" -) - -var nowFunc = time.Now // for testing - -// ErrPoolExhausted is returned from a pool connection method (Do, Send, -// Receive, Flush, Err) when the maximum number of database connections in the -// pool has been reached. -var ErrPoolExhausted = errors.New("redigo: connection pool exhausted") - -var ( - errPoolClosed = errors.New("redigo: connection pool closed") - errConnClosed = errors.New("redigo: connection closed") -) - -// Pool maintains a pool of connections. The application calls the Get method -// to get a connection from the pool and the connection's Close method to -// return the connection's resources to the pool. -// -// The following example shows how to use a pool in a web application. The -// application creates a pool at application startup and makes it available to -// request handlers using a global variable. -// -// func newPool(server, password string) *redis.Pool { -// return &redis.Pool{ -// MaxIdle: 3, -// IdleTimeout: 240 * time.Second, -// Dial: func () (redis.Conn, error) { -// c, err := redis.Dial("tcp", server) -// if err != nil { -// return nil, err -// } -// if _, err := c.Do("AUTH", password); err != nil { -// c.Close() -// return nil, err -// } -// return c, err -// }, -// TestOnBorrow: func(c redis.Conn, t time.Time) error { -// _, err := c.Do("PING") -// return err -// }, -// } -// } -// -// var ( -// pool *redis.Pool -// redisServer = flag.String("redisServer", ":6379", "") -// redisPassword = flag.String("redisPassword", "", "") -// ) -// -// func main() { -// flag.Parse() -// pool = newPool(*redisServer, *redisPassword) -// ... -// } -// -// A request handler gets a connection from the pool and closes the connection -// when the handler is done: -// -// func serveHome(w http.ResponseWriter, r *http.Request) { -// conn := pool.Get() -// defer conn.Close() -// .... -// } -// -type Pool struct { - - // Dial is an application supplied function for creating and configuring a - // connection - Dial func() (Conn, error) - - // TestOnBorrow is an optional application supplied function for checking - // the health of an idle connection before the connection is used again by - // the application. Argument t is the time that the connection was returned - // to the pool. If the function returns an error, then the connection is - // closed. - TestOnBorrow func(c Conn, t time.Time) error - - // Maximum number of idle connections in the pool. - MaxIdle int - - // Maximum number of connections allocated by the pool at a given time. - // When zero, there is no limit on the number of connections in the pool. - MaxActive int - - // Close connections after remaining idle for this duration. If the value - // is zero, then idle connections are not closed. Applications should set - // the timeout to a value less than the server's timeout. - IdleTimeout time.Duration - - // If Wait is true and the pool is at the MaxIdle limit, then Get() waits - // for a connection to be returned to the pool before returning. - Wait bool - - // mu protects fields defined below. - mu sync.Mutex - cond *sync.Cond - closed bool - active int - - // Stack of idleConn with most recently used at the front. - idle list.List -} - -type idleConn struct { - c Conn - t time.Time -} - -// NewPool creates a new pool. This function is deprecated. Applications should -// initialize the Pool fields directly as shown in example. -func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { - return &Pool{Dial: newFn, MaxIdle: maxIdle} -} - -// Get gets a connection. The application must close the returned connection. -// This method always returns a valid connection so that applications can defer -// error handling to the first use of the connection. If there is an error -// getting an underlying connection, then the connection Err, Do, Send, Flush -// and Receive methods return that error. -func (p *Pool) Get() Conn { - c, err := p.get() - if err != nil { - return errorConnection{err} - } - return &pooledConnection{p: p, c: c} -} - -// ActiveCount returns the number of active connections in the pool. -func (p *Pool) ActiveCount() int { - p.mu.Lock() - active := p.active - p.mu.Unlock() - return active -} - -// Close releases the resources used by the pool. -func (p *Pool) Close() error { - p.mu.Lock() - idle := p.idle - p.idle.Init() - p.closed = true - p.active -= idle.Len() - if p.cond != nil { - p.cond.Broadcast() - } - p.mu.Unlock() - for e := idle.Front(); e != nil; e = e.Next() { - e.Value.(idleConn).c.Close() - } - return nil -} - -// release decrements the active count and signals waiters. The caller must -// hold p.mu during the call. -func (p *Pool) release() { - p.active -= 1 - if p.cond != nil { - p.cond.Signal() - } -} - -// get prunes stale connections and returns a connection from the idle list or -// creates a new connection. -func (p *Pool) get() (Conn, error) { - p.mu.Lock() - - // Prune stale connections. - - if timeout := p.IdleTimeout; timeout > 0 { - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Back() - if e == nil { - break - } - ic := e.Value.(idleConn) - if ic.t.Add(timeout).After(nowFunc()) { - break - } - p.idle.Remove(e) - p.release() - p.mu.Unlock() - ic.c.Close() - p.mu.Lock() - } - } - - for { - - // Get idle connection. - - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Front() - if e == nil { - break - } - ic := e.Value.(idleConn) - p.idle.Remove(e) - test := p.TestOnBorrow - p.mu.Unlock() - if test == nil || test(ic.c, ic.t) == nil { - return ic.c, nil - } - ic.c.Close() - p.mu.Lock() - p.release() - } - - // Check for pool closed before dialing a new connection. - - if p.closed { - p.mu.Unlock() - return nil, errors.New("redigo: get on closed pool") - } - - // Dial new connection if under limit. - - if p.MaxActive == 0 || p.active < p.MaxActive { - dial := p.Dial - p.active += 1 - p.mu.Unlock() - c, err := dial() - if err != nil { - p.mu.Lock() - p.release() - p.mu.Unlock() - c = nil - } - return c, err - } - - if !p.Wait { - p.mu.Unlock() - return nil, ErrPoolExhausted - } - - if p.cond == nil { - p.cond = sync.NewCond(&p.mu) - } - p.cond.Wait() - } -} - -func (p *Pool) put(c Conn, forceClose bool) error { - err := c.Err() - p.mu.Lock() - if !p.closed && err == nil && !forceClose { - p.idle.PushFront(idleConn{t: nowFunc(), c: c}) - if p.idle.Len() > p.MaxIdle { - c = p.idle.Remove(p.idle.Back()).(idleConn).c - } else { - c = nil - } - } - - if c == nil { - if p.cond != nil { - p.cond.Signal() - } - p.mu.Unlock() - return nil - } - - p.release() - p.mu.Unlock() - return c.Close() -} - -type pooledConnection struct { - p *Pool - c Conn - state int -} - -var ( - sentinel []byte - sentinelOnce sync.Once -) - -func initSentinel() { - p := make([]byte, 64) - if _, err := rand.Read(p); err == nil { - sentinel = p - } else { - h := sha1.New() - io.WriteString(h, "Oops, rand failed. Use time instead.") - io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) - sentinel = h.Sum(nil) - } -} - -func (pc *pooledConnection) Close() error { - c := pc.c - if _, ok := c.(errorConnection); ok { - return nil - } - pc.c = errorConnection{errConnClosed} - - if pc.state&internal.MultiState != 0 { - c.Send("DISCARD") - pc.state &^= (internal.MultiState | internal.WatchState) - } else if pc.state&internal.WatchState != 0 { - c.Send("UNWATCH") - pc.state &^= internal.WatchState - } - if pc.state&internal.SubscribeState != 0 { - c.Send("UNSUBSCRIBE") - c.Send("PUNSUBSCRIBE") - // To detect the end of the message stream, ask the server to echo - // a sentinel value and read until we see that value. - sentinelOnce.Do(initSentinel) - c.Send("ECHO", sentinel) - c.Flush() - for { - p, err := c.Receive() - if err != nil { - break - } - if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { - pc.state &^= internal.SubscribeState - break - } - } - } - c.Do("") - pc.p.put(c, pc.state != 0) - return nil -} - -func (pc *pooledConnection) Err() error { - return pc.c.Err() -} - -func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { - ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear - return pc.c.Do(commandName, args...) -} - -func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { - ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear - return pc.c.Send(commandName, args...) -} - -func (pc *pooledConnection) Flush() error { - return pc.c.Flush() -} - -func (pc *pooledConnection) Receive() (reply interface{}, err error) { - return pc.c.Receive() -} - -type errorConnection struct{ err error } - -func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } -func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } -func (ec errorConnection) Err() error { return ec.err } -func (ec errorConnection) Close() error { return ec.err } -func (ec errorConnection) Flush() error { return ec.err } -func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err } diff --git a/vendor/github.com/garyburd/redigo/redis/redis.go b/vendor/github.com/garyburd/redigo/redis/redis.go deleted file mode 100644 index c90a48ed4..000000000 --- a/vendor/github.com/garyburd/redigo/redis/redis.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2012 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redis - -// Error represents an error returned in a command reply. -type Error string - -func (err Error) Error() string { return string(err) } - -// Conn represents a connection to a Redis server. -type Conn interface { - // Close closes the connection. - Close() error - - // Err returns a non-nil value if the connection is broken. The returned - // value is either the first non-nil value returned from the underlying - // network connection or a protocol parsing error. Applications should - // close broken connections. - Err() error - - // Do sends a command to the server and returns the received reply. - Do(commandName string, args ...interface{}) (reply interface{}, err error) - - // Send writes the command to the client's output buffer. - Send(commandName string, args ...interface{}) error - - // Flush flushes the output buffer to the Redis server. - Flush() error - - // Receive receives a single reply from the Redis server - Receive() (reply interface{}, err error) -} diff --git a/vendor/github.com/garyburd/redigo/redis/reply.go b/vendor/github.com/garyburd/redigo/redis/reply.go deleted file mode 100644 index 5648f930d..000000000 --- a/vendor/github.com/garyburd/redigo/redis/reply.go +++ /dev/null @@ -1,312 +0,0 @@ -// Copyright 2012 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redis - -import ( - "errors" - "fmt" - "strconv" -) - -// ErrNil indicates that a reply value is nil. -var ErrNil = errors.New("redigo: nil returned") - -// Int is a helper that converts a command reply to an integer. If err is not -// equal to nil, then Int returns 0, err. Otherwise, Int converts the -// reply to an int as follows: -// -// Reply type Result -// integer int(reply), nil -// bulk string parsed reply, nil -// nil 0, ErrNil -// other 0, error -func Int(reply interface{}, err error) (int, error) { - if err != nil { - return 0, err - } - switch reply := reply.(type) { - case int64: - x := int(reply) - if int64(x) != reply { - return 0, strconv.ErrRange - } - return x, nil - case []byte: - n, err := strconv.ParseInt(string(reply), 10, 0) - return int(n), err - case nil: - return 0, ErrNil - case Error: - return 0, reply - } - return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply) -} - -// Int64 is a helper that converts a command reply to 64 bit integer. If err is -// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the -// reply to an int64 as follows: -// -// Reply type Result -// integer reply, nil -// bulk string parsed reply, nil -// nil 0, ErrNil -// other 0, error -func Int64(reply interface{}, err error) (int64, error) { - if err != nil { - return 0, err - } - switch reply := reply.(type) { - case int64: - return reply, nil - case []byte: - n, err := strconv.ParseInt(string(reply), 10, 64) - return n, err - case nil: - return 0, ErrNil - case Error: - return 0, reply - } - return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply) -} - -var errNegativeInt = errors.New("redigo: unexpected value for Uint64") - -// Uint64 is a helper that converts a command reply to 64 bit integer. If err is -// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the -// reply to an int64 as follows: -// -// Reply type Result -// integer reply, nil -// bulk string parsed reply, nil -// nil 0, ErrNil -// other 0, error -func Uint64(reply interface{}, err error) (uint64, error) { - if err != nil { - return 0, err - } - switch reply := reply.(type) { - case int64: - if reply < 0 { - return 0, errNegativeInt - } - return uint64(reply), nil - case []byte: - n, err := strconv.ParseUint(string(reply), 10, 64) - return n, err - case nil: - return 0, ErrNil - case Error: - return 0, reply - } - return 0, fmt.Errorf("redigo: unexpected type for Uint64, got type %T", reply) -} - -// Float64 is a helper that converts a command reply to 64 bit float. If err is -// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts -// the reply to an int as follows: -// -// Reply type Result -// bulk string parsed reply, nil -// nil 0, ErrNil -// other 0, error -func Float64(reply interface{}, err error) (float64, error) { - if err != nil { - return 0, err - } - switch reply := reply.(type) { - case []byte: - n, err := strconv.ParseFloat(string(reply), 64) - return n, err - case nil: - return 0, ErrNil - case Error: - return 0, reply - } - return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply) -} - -// String is a helper that converts a command reply to a string. If err is not -// equal to nil, then String returns "", err. Otherwise String converts the -// reply to a string as follows: -// -// Reply type Result -// bulk string string(reply), nil -// simple string reply, nil -// nil "", ErrNil -// other "", error -func String(reply interface{}, err error) (string, error) { - if err != nil { - return "", err - } - switch reply := reply.(type) { - case []byte: - return string(reply), nil - case string: - return reply, nil - case nil: - return "", ErrNil - case Error: - return "", reply - } - return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply) -} - -// Bytes is a helper that converts a command reply to a slice of bytes. If err -// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts -// the reply to a slice of bytes as follows: -// -// Reply type Result -// bulk string reply, nil -// simple string []byte(reply), nil -// nil nil, ErrNil -// other nil, error -func Bytes(reply interface{}, err error) ([]byte, error) { - if err != nil { - return nil, err - } - switch reply := reply.(type) { - case []byte: - return reply, nil - case string: - return []byte(reply), nil - case nil: - return nil, ErrNil - case Error: - return nil, reply - } - return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply) -} - -// Bool is a helper that converts a command reply to a boolean. If err is not -// equal to nil, then Bool returns false, err. Otherwise Bool converts the -// reply to boolean as follows: -// -// Reply type Result -// integer value != 0, nil -// bulk string strconv.ParseBool(reply) -// nil false, ErrNil -// other false, error -func Bool(reply interface{}, err error) (bool, error) { - if err != nil { - return false, err - } - switch reply := reply.(type) { - case int64: - return reply != 0, nil - case []byte: - return strconv.ParseBool(string(reply)) - case nil: - return false, ErrNil - case Error: - return false, reply - } - return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply) -} - -// MultiBulk is deprecated. Use Values. -func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) } - -// Values is a helper that converts an array command reply to a []interface{}. -// If err is not equal to nil, then Values returns nil, err. Otherwise, Values -// converts the reply as follows: -// -// Reply type Result -// array reply, nil -// nil nil, ErrNil -// other nil, error -func Values(reply interface{}, err error) ([]interface{}, error) { - if err != nil { - return nil, err - } - switch reply := reply.(type) { - case []interface{}: - return reply, nil - case nil: - return nil, ErrNil - case Error: - return nil, reply - } - return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply) -} - -// Strings is a helper that converts an array command reply to a []string. If -// err is not equal to nil, then Strings returns nil, err. Nil array items are -// converted to "" in the output slice. Strings returns an error if an array -// item is not a bulk string or nil. -func Strings(reply interface{}, err error) ([]string, error) { - if err != nil { - return nil, err - } - switch reply := reply.(type) { - case []interface{}: - result := make([]string, len(reply)) - for i := range reply { - if reply[i] == nil { - continue - } - p, ok := reply[i].([]byte) - if !ok { - return nil, fmt.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i]) - } - result[i] = string(p) - } - return result, nil - case nil: - return nil, ErrNil - case Error: - return nil, reply - } - return nil, fmt.Errorf("redigo: unexpected type for Strings, got type %T", reply) -} - -// Ints is a helper that converts an array command reply to a []int. If -// err is not equal to nil, then Ints returns nil, err. -func Ints(reply interface{}, err error) ([]int, error) { - var ints []int - if reply == nil { - return ints, ErrNil - } - values, err := Values(reply, err) - if err != nil { - return ints, err - } - if err := ScanSlice(values, &ints); err != nil { - return ints, err - } - return ints, nil -} - -// StringMap is a helper that converts an array of strings (alternating key, value) -// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format. -// Requires an even number of values in result. -func StringMap(result interface{}, err error) (map[string]string, error) { - values, err := Values(result, err) - if err != nil { - return nil, err - } - if len(values)%2 != 0 { - return nil, errors.New("redigo: StringMap expects even number of values result") - } - m := make(map[string]string, len(values)/2) - for i := 0; i < len(values); i += 2 { - key, okKey := values[i].([]byte) - value, okValue := values[i+1].([]byte) - if !okKey || !okValue { - return nil, errors.New("redigo: ScanMap key not a bulk string value") - } - m[string(key)] = string(value) - } - return m, nil -} diff --git a/vendor/github.com/gomodule/redigo/LICENSE b/vendor/github.com/gomodule/redigo/LICENSE new file mode 100644 index 000000000..67db85882 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/LICENSE @@ -0,0 +1,175 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. diff --git a/vendor/github.com/gomodule/redigo/redis/commandinfo.go b/vendor/github.com/gomodule/redigo/redis/commandinfo.go new file mode 100644 index 000000000..b6df6a25a --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/commandinfo.go @@ -0,0 +1,55 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "strings" +) + +const ( + connectionWatchState = 1 << iota + connectionMultiState + connectionSubscribeState + connectionMonitorState +) + +type commandInfo struct { + // Set or Clear these states on connection. + Set, Clear int +} + +var commandInfos = map[string]commandInfo{ + "WATCH": {Set: connectionWatchState}, + "UNWATCH": {Clear: connectionWatchState}, + "MULTI": {Set: connectionMultiState}, + "EXEC": {Clear: connectionWatchState | connectionMultiState}, + "DISCARD": {Clear: connectionWatchState | connectionMultiState}, + "PSUBSCRIBE": {Set: connectionSubscribeState}, + "SUBSCRIBE": {Set: connectionSubscribeState}, + "MONITOR": {Set: connectionMonitorState}, +} + +func init() { + for n, ci := range commandInfos { + commandInfos[strings.ToLower(n)] = ci + } +} + +func lookupCommandInfo(commandName string) commandInfo { + if ci, ok := commandInfos[commandName]; ok { + return ci + } + return commandInfos[strings.ToUpper(commandName)] +} diff --git a/vendor/github.com/gomodule/redigo/redis/conn.go b/vendor/github.com/gomodule/redigo/redis/conn.go new file mode 100644 index 000000000..33b43be69 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/conn.go @@ -0,0 +1,736 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "net/url" + "regexp" + "strconv" + "sync" + "time" +) + +var ( + _ ConnWithTimeout = (*conn)(nil) +) + +// conn is the low-level implementation of Conn +type conn struct { + // Shared + mu sync.Mutex + pending int + err error + conn net.Conn + + // Read + readTimeout time.Duration + br *bufio.Reader + + // Write + writeTimeout time.Duration + bw *bufio.Writer + + // Scratch space for formatting argument length. + // '*' or '$', length, "\r\n" + lenScratch [32]byte + + // Scratch space for formatting integers and floats. + numScratch [40]byte +} + +// DialTimeout acts like Dial but takes timeouts for establishing the +// connection to the server, writing a command and reading a reply. +// +// Deprecated: Use Dial with options instead. +func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) { + return Dial(network, address, + DialConnectTimeout(connectTimeout), + DialReadTimeout(readTimeout), + DialWriteTimeout(writeTimeout)) +} + +// DialOption specifies an option for dialing a Redis server. +type DialOption struct { + f func(*dialOptions) +} + +type dialOptions struct { + readTimeout time.Duration + writeTimeout time.Duration + dialer *net.Dialer + dialContext func(ctx context.Context, network, addr string) (net.Conn, error) + db int + username string + password string + clientName string + useTLS bool + skipVerify bool + tlsConfig *tls.Config +} + +// DialReadTimeout specifies the timeout for reading a single command reply. +func DialReadTimeout(d time.Duration) DialOption { + return DialOption{func(do *dialOptions) { + do.readTimeout = d + }} +} + +// DialWriteTimeout specifies the timeout for writing a single command. +func DialWriteTimeout(d time.Duration) DialOption { + return DialOption{func(do *dialOptions) { + do.writeTimeout = d + }} +} + +// DialConnectTimeout specifies the timeout for connecting to the Redis server when +// no DialNetDial option is specified. +func DialConnectTimeout(d time.Duration) DialOption { + return DialOption{func(do *dialOptions) { + do.dialer.Timeout = d + }} +} + +// DialKeepAlive specifies the keep-alive period for TCP connections to the Redis server +// when no DialNetDial option is specified. +// If zero, keep-alives are not enabled. If no DialKeepAlive option is specified then +// the default of 5 minutes is used to ensure that half-closed TCP sessions are detected. +func DialKeepAlive(d time.Duration) DialOption { + return DialOption{func(do *dialOptions) { + do.dialer.KeepAlive = d + }} +} + +// DialNetDial specifies a custom dial function for creating TCP +// connections, otherwise a net.Dialer customized via the other options is used. +// DialNetDial overrides DialConnectTimeout and DialKeepAlive. +func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption { + return DialOption{func(do *dialOptions) { + do.dialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return dial(network, addr) + } + }} +} + +// DialContextFunc specifies a custom dial function with context for creating TCP +// connections, otherwise a net.Dialer customized via the other options is used. +// DialContextFunc overrides DialConnectTimeout and DialKeepAlive. +func DialContextFunc(f func(ctx context.Context, network, addr string) (net.Conn, error)) DialOption { + return DialOption{func(do *dialOptions) { + do.dialContext = f + }} +} + +// DialDatabase specifies the database to select when dialing a connection. +func DialDatabase(db int) DialOption { + return DialOption{func(do *dialOptions) { + do.db = db + }} +} + +// DialPassword specifies the password to use when connecting to +// the Redis server. +func DialPassword(password string) DialOption { + return DialOption{func(do *dialOptions) { + do.password = password + }} +} + +// DialUsername specifies the username to use when connecting to +// the Redis server when Redis ACLs are used. +func DialUsername(username string) DialOption { + return DialOption{func(do *dialOptions) { + do.username = username + }} +} + +// DialClientName specifies a client name to be used +// by the Redis server connection. +func DialClientName(name string) DialOption { + return DialOption{func(do *dialOptions) { + do.clientName = name + }} +} + +// DialTLSConfig specifies the config to use when a TLS connection is dialed. +// Has no effect when not dialing a TLS connection. +func DialTLSConfig(c *tls.Config) DialOption { + return DialOption{func(do *dialOptions) { + do.tlsConfig = c + }} +} + +// DialTLSSkipVerify disables server name verification when connecting over +// TLS. Has no effect when not dialing a TLS connection. +func DialTLSSkipVerify(skip bool) DialOption { + return DialOption{func(do *dialOptions) { + do.skipVerify = skip + }} +} + +// DialUseTLS specifies whether TLS should be used when connecting to the +// server. This option is ignore by DialURL. +func DialUseTLS(useTLS bool) DialOption { + return DialOption{func(do *dialOptions) { + do.useTLS = useTLS + }} +} + +// Dial connects to the Redis server at the given network and +// address using the specified options. +func Dial(network, address string, options ...DialOption) (Conn, error) { + return DialContext(context.Background(), network, address, options...) +} + +// DialContext connects to the Redis server at the given network and +// address using the specified options and context. +func DialContext(ctx context.Context, network, address string, options ...DialOption) (Conn, error) { + do := dialOptions{ + dialer: &net.Dialer{ + KeepAlive: time.Minute * 5, + }, + } + for _, option := range options { + option.f(&do) + } + if do.dialContext == nil { + do.dialContext = do.dialer.DialContext + } + + netConn, err := do.dialContext(ctx, network, address) + if err != nil { + return nil, err + } + + if do.useTLS { + var tlsConfig *tls.Config + if do.tlsConfig == nil { + tlsConfig = &tls.Config{InsecureSkipVerify: do.skipVerify} + } else { + tlsConfig = cloneTLSConfig(do.tlsConfig) + } + if tlsConfig.ServerName == "" { + host, _, err := net.SplitHostPort(address) + if err != nil { + netConn.Close() + return nil, err + } + tlsConfig.ServerName = host + } + + tlsConn := tls.Client(netConn, tlsConfig) + if err := tlsConn.Handshake(); err != nil { + netConn.Close() + return nil, err + } + netConn = tlsConn + } + + c := &conn{ + conn: netConn, + bw: bufio.NewWriter(netConn), + br: bufio.NewReader(netConn), + readTimeout: do.readTimeout, + writeTimeout: do.writeTimeout, + } + + if do.password != "" { + authArgs := make([]interface{}, 0, 2) + if do.username != "" { + authArgs = append(authArgs, do.username) + } + authArgs = append(authArgs, do.password) + if _, err := c.Do("AUTH", authArgs...); err != nil { + netConn.Close() + return nil, err + } + } + + if do.clientName != "" { + if _, err := c.Do("CLIENT", "SETNAME", do.clientName); err != nil { + netConn.Close() + return nil, err + } + } + + if do.db != 0 { + if _, err := c.Do("SELECT", do.db); err != nil { + netConn.Close() + return nil, err + } + } + + return c, nil +} + +var pathDBRegexp = regexp.MustCompile(`/(\d*)\z`) + +// DialURL connects to a Redis server at the given URL using the Redis +// URI scheme. URLs should follow the draft IANA specification for the +// scheme (https://www.iana.org/assignments/uri-schemes/prov/redis). +func DialURL(rawurl string, options ...DialOption) (Conn, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + if u.Scheme != "redis" && u.Scheme != "rediss" { + return nil, fmt.Errorf("invalid redis URL scheme: %s", u.Scheme) + } + + if u.Opaque != "" { + return nil, fmt.Errorf("invalid redis URL, url is opaque: %s", rawurl) + } + + // As per the IANA draft spec, the host defaults to localhost and + // the port defaults to 6379. + host, port, err := net.SplitHostPort(u.Host) + if err != nil { + // assume port is missing + host = u.Host + port = "6379" + } + if host == "" { + host = "localhost" + } + address := net.JoinHostPort(host, port) + + if u.User != nil { + password, isSet := u.User.Password() + if isSet { + options = append(options, DialUsername(u.User.Username()), DialPassword(password)) + } + } + + match := pathDBRegexp.FindStringSubmatch(u.Path) + if len(match) == 2 { + db := 0 + if len(match[1]) > 0 { + db, err = strconv.Atoi(match[1]) + if err != nil { + return nil, fmt.Errorf("invalid database: %s", u.Path[1:]) + } + } + if db != 0 { + options = append(options, DialDatabase(db)) + } + } else if u.Path != "" { + return nil, fmt.Errorf("invalid database: %s", u.Path[1:]) + } + + options = append(options, DialUseTLS(u.Scheme == "rediss")) + + return Dial("tcp", address, options...) +} + +// NewConn returns a new Redigo connection for the given net connection. +func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn { + return &conn{ + conn: netConn, + bw: bufio.NewWriter(netConn), + br: bufio.NewReader(netConn), + readTimeout: readTimeout, + writeTimeout: writeTimeout, + } +} + +func (c *conn) Close() error { + c.mu.Lock() + err := c.err + if c.err == nil { + c.err = errors.New("redigo: closed") + err = c.conn.Close() + } + c.mu.Unlock() + return err +} + +func (c *conn) fatal(err error) error { + c.mu.Lock() + if c.err == nil { + c.err = err + // Close connection to force errors on subsequent calls and to unblock + // other reader or writer. + c.conn.Close() + } + c.mu.Unlock() + return err +} + +func (c *conn) Err() error { + c.mu.Lock() + err := c.err + c.mu.Unlock() + return err +} + +func (c *conn) writeLen(prefix byte, n int) error { + c.lenScratch[len(c.lenScratch)-1] = '\n' + c.lenScratch[len(c.lenScratch)-2] = '\r' + i := len(c.lenScratch) - 3 + for { + c.lenScratch[i] = byte('0' + n%10) + i -= 1 + n = n / 10 + if n == 0 { + break + } + } + c.lenScratch[i] = prefix + _, err := c.bw.Write(c.lenScratch[i:]) + return err +} + +func (c *conn) writeString(s string) error { + c.writeLen('$', len(s)) + c.bw.WriteString(s) + _, err := c.bw.WriteString("\r\n") + return err +} + +func (c *conn) writeBytes(p []byte) error { + c.writeLen('$', len(p)) + c.bw.Write(p) + _, err := c.bw.WriteString("\r\n") + return err +} + +func (c *conn) writeInt64(n int64) error { + return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10)) +} + +func (c *conn) writeFloat64(n float64) error { + return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)) +} + +func (c *conn) writeCommand(cmd string, args []interface{}) error { + c.writeLen('*', 1+len(args)) + if err := c.writeString(cmd); err != nil { + return err + } + for _, arg := range args { + if err := c.writeArg(arg, true); err != nil { + return err + } + } + return nil +} + +func (c *conn) writeArg(arg interface{}, argumentTypeOK bool) (err error) { + switch arg := arg.(type) { + case string: + return c.writeString(arg) + case []byte: + return c.writeBytes(arg) + case int: + return c.writeInt64(int64(arg)) + case int64: + return c.writeInt64(arg) + case float64: + return c.writeFloat64(arg) + case bool: + if arg { + return c.writeString("1") + } else { + return c.writeString("0") + } + case nil: + return c.writeString("") + case Argument: + if argumentTypeOK { + return c.writeArg(arg.RedisArg(), false) + } + // See comment in default clause below. + var buf bytes.Buffer + fmt.Fprint(&buf, arg) + return c.writeBytes(buf.Bytes()) + default: + // This default clause is intended to handle builtin numeric types. + // The function should return an error for other types, but this is not + // done for compatibility with previous versions of the package. + var buf bytes.Buffer + fmt.Fprint(&buf, arg) + return c.writeBytes(buf.Bytes()) + } +} + +type protocolError string + +func (pe protocolError) Error() string { + return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe)) +} + +// readLine reads a line of input from the RESP stream. +func (c *conn) readLine() ([]byte, error) { + // To avoid allocations, attempt to read the line using ReadSlice. This + // call typically succeeds. The known case where the call fails is when + // reading the output from the MONITOR command. + p, err := c.br.ReadSlice('\n') + if err == bufio.ErrBufferFull { + // The line does not fit in the bufio.Reader's buffer. Fall back to + // allocating a buffer for the line. + buf := append([]byte{}, p...) + for err == bufio.ErrBufferFull { + p, err = c.br.ReadSlice('\n') + buf = append(buf, p...) + } + p = buf + } + if err != nil { + return nil, err + } + i := len(p) - 2 + if i < 0 || p[i] != '\r' { + return nil, protocolError("bad response line terminator") + } + return p[:i], nil +} + +// parseLen parses bulk string and array lengths. +func parseLen(p []byte) (int, error) { + if len(p) == 0 { + return -1, protocolError("malformed length") + } + + if p[0] == '-' && len(p) == 2 && p[1] == '1' { + // handle $-1 and $-1 null replies. + return -1, nil + } + + var n int + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return -1, protocolError("illegal bytes in length") + } + n += int(b - '0') + } + + return n, nil +} + +// parseInt parses an integer reply. +func parseInt(p []byte) (interface{}, error) { + if len(p) == 0 { + return 0, protocolError("malformed integer") + } + + var negate bool + if p[0] == '-' { + negate = true + p = p[1:] + if len(p) == 0 { + return 0, protocolError("malformed integer") + } + } + + var n int64 + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return 0, protocolError("illegal bytes in length") + } + n += int64(b - '0') + } + + if negate { + n = -n + } + return n, nil +} + +var ( + okReply interface{} = "OK" + pongReply interface{} = "PONG" +) + +func (c *conn) readReply() (interface{}, error) { + line, err := c.readLine() + if err != nil { + return nil, err + } + if len(line) == 0 { + return nil, protocolError("short response line") + } + switch line[0] { + case '+': + switch string(line[1:]) { + case "OK": + // Avoid allocation for frequent "+OK" response. + return okReply, nil + case "PONG": + // Avoid allocation in PING command benchmarks :) + return pongReply, nil + default: + return string(line[1:]), nil + } + case '-': + return Error(string(line[1:])), nil + case ':': + return parseInt(line[1:]) + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + p := make([]byte, n) + _, err = io.ReadFull(c.br, p) + if err != nil { + return nil, err + } + if line, err := c.readLine(); err != nil { + return nil, err + } else if len(line) != 0 { + return nil, protocolError("bad bulk string format") + } + return p, nil + case '*': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + r := make([]interface{}, n) + for i := range r { + r[i], err = c.readReply() + if err != nil { + return nil, err + } + } + return r, nil + } + return nil, protocolError("unexpected response line") +} + +func (c *conn) Send(cmd string, args ...interface{}) error { + c.mu.Lock() + c.pending += 1 + c.mu.Unlock() + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + if err := c.writeCommand(cmd, args); err != nil { + return c.fatal(err) + } + return nil +} + +func (c *conn) Flush() error { + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + if err := c.bw.Flush(); err != nil { + return c.fatal(err) + } + return nil +} + +func (c *conn) Receive() (interface{}, error) { + return c.ReceiveWithTimeout(c.readTimeout) +} + +func (c *conn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) { + var deadline time.Time + if timeout != 0 { + deadline = time.Now().Add(timeout) + } + c.conn.SetReadDeadline(deadline) + + if reply, err = c.readReply(); err != nil { + return nil, c.fatal(err) + } + // When using pub/sub, the number of receives can be greater than the + // number of sends. To enable normal use of the connection after + // unsubscribing from all channels, we do not decrement pending to a + // negative value. + // + // The pending field is decremented after the reply is read to handle the + // case where Receive is called before Send. + c.mu.Lock() + if c.pending > 0 { + c.pending -= 1 + } + c.mu.Unlock() + if err, ok := reply.(Error); ok { + return nil, err + } + return +} + +func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { + return c.DoWithTimeout(c.readTimeout, cmd, args...) +} + +func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) { + c.mu.Lock() + pending := c.pending + c.pending = 0 + c.mu.Unlock() + + if cmd == "" && pending == 0 { + return nil, nil + } + + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + + if cmd != "" { + if err := c.writeCommand(cmd, args); err != nil { + return nil, c.fatal(err) + } + } + + if err := c.bw.Flush(); err != nil { + return nil, c.fatal(err) + } + + var deadline time.Time + if readTimeout != 0 { + deadline = time.Now().Add(readTimeout) + } + c.conn.SetReadDeadline(deadline) + + if cmd == "" { + reply := make([]interface{}, pending) + for i := range reply { + r, e := c.readReply() + if e != nil { + return nil, c.fatal(e) + } + reply[i] = r + } + return reply, nil + } + + var err error + var reply interface{} + for i := 0; i <= pending; i++ { + var e error + if reply, e = c.readReply(); e != nil { + return nil, c.fatal(e) + } + if e, ok := reply.(Error); ok && err == nil { + err = e + } + } + return reply, err +} diff --git a/vendor/github.com/garyburd/redigo/redis/doc.go b/vendor/github.com/gomodule/redigo/redis/doc.go similarity index 85% rename from vendor/github.com/garyburd/redigo/redis/doc.go rename to vendor/github.com/gomodule/redigo/redis/doc.go index a5cd454af..69ad506cd 100644 --- a/vendor/github.com/garyburd/redigo/redis/doc.go +++ b/vendor/github.com/gomodule/redigo/redis/doc.go @@ -14,7 +14,7 @@ // Package redis is a client for the Redis database. // -// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more +// The Redigo FAQ (https://github.com/gomodule/redigo/wiki/FAQ) contains more // documentation about this package. // // Connections @@ -38,7 +38,7 @@ // // n, err := conn.Do("APPEND", "key", "value") // -// The Do method converts command arguments to binary strings for transmission +// The Do method converts command arguments to bulk strings for transmission // to the server as follows: // // Go Type Conversion @@ -48,7 +48,7 @@ // float64 strconv.FormatFloat(v, 'g', -1, 64) // bool true -> "1", false -> "0" // nil "" -// all other types fmt.Print(v) +// all other types fmt.Fprint(w, v) // // Redis command reply types are represented using the following Go types: // @@ -99,15 +99,14 @@ // // Concurrency // -// Connections do not support concurrent calls to the write methods (Send, -// Flush) or concurrent calls to the read method (Receive). Connections do -// allow a concurrent reader and writer. +// Connections support one concurrent caller to the Receive method and one +// concurrent caller to the Send and Flush methods. No other concurrency is +// supported including concurrent calls to the Do and Close methods. // -// Because the Do method combines the functionality of Send, Flush and Receive, -// the Do method cannot be called concurrently with the other methods. -// -// For full concurrent access to Redis, use the thread-safe Pool to get and -// release connections from within a goroutine. +// For full concurrent access to Redis, use the thread-safe Pool to get, use +// and release a connection from within a goroutine. Connections returned from +// a Pool have the concurrency restrictions described in the previous +// paragraph. // // Publish and Subscribe // @@ -128,7 +127,7 @@ // send and flush a subscription management command. The receive method // converts a pushed message to convenient types for use in a type switch. // -// psc := redis.PubSubConn{c} +// psc := redis.PubSubConn{Conn: c} // psc.Subscribe("example") // for { // switch v := psc.Receive().(type) { @@ -166,4 +165,13 @@ // if _, err := redis.Scan(reply, &value1, &value2); err != nil { // // handle error // } -package redis // import "github.com/garyburd/redigo/redis" +// +// Errors +// +// Connection methods return error replies from the server as type redis.Error. +// +// Call the connection Err() method to determine if the connection encountered +// non-recoverable error such as a network error or protocol parsing error. If +// Err() returns a non-nil value, then the connection is not usable and should +// be closed. +package redis diff --git a/vendor/github.com/gomodule/redigo/redis/go17.go b/vendor/github.com/gomodule/redigo/redis/go17.go new file mode 100644 index 000000000..5f3637911 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/go17.go @@ -0,0 +1,29 @@ +// +build go1.7,!go1.8 + +package redis + +import "crypto/tls" + +func cloneTLSConfig(cfg *tls.Config) *tls.Config { + return &tls.Config{ + Rand: cfg.Rand, + Time: cfg.Time, + Certificates: cfg.Certificates, + NameToCertificate: cfg.NameToCertificate, + GetCertificate: cfg.GetCertificate, + RootCAs: cfg.RootCAs, + NextProtos: cfg.NextProtos, + ServerName: cfg.ServerName, + ClientAuth: cfg.ClientAuth, + ClientCAs: cfg.ClientCAs, + InsecureSkipVerify: cfg.InsecureSkipVerify, + CipherSuites: cfg.CipherSuites, + PreferServerCipherSuites: cfg.PreferServerCipherSuites, + ClientSessionCache: cfg.ClientSessionCache, + MinVersion: cfg.MinVersion, + MaxVersion: cfg.MaxVersion, + CurvePreferences: cfg.CurvePreferences, + DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled, + Renegotiation: cfg.Renegotiation, + } +} diff --git a/vendor/github.com/gomodule/redigo/redis/go18.go b/vendor/github.com/gomodule/redigo/redis/go18.go new file mode 100644 index 000000000..558363be3 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/go18.go @@ -0,0 +1,9 @@ +// +build go1.8 + +package redis + +import "crypto/tls" + +func cloneTLSConfig(cfg *tls.Config) *tls.Config { + return cfg.Clone() +} diff --git a/vendor/github.com/garyburd/redigo/redis/log.go b/vendor/github.com/gomodule/redigo/redis/log.go similarity index 73% rename from vendor/github.com/garyburd/redigo/redis/log.go rename to vendor/github.com/gomodule/redigo/redis/log.go index 129b86d67..a06db9d6c 100644 --- a/vendor/github.com/garyburd/redigo/redis/log.go +++ b/vendor/github.com/gomodule/redigo/redis/log.go @@ -18,6 +18,11 @@ import ( "bytes" "fmt" "log" + "time" +) + +var ( + _ ConnWithTimeout = (*loggingConn)(nil) ) // NewLoggingConn returns a logging wrapper around a connection. @@ -25,13 +30,22 @@ func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn { if prefix != "" { prefix = prefix + "." } - return &loggingConn{conn, logger, prefix} + return &loggingConn{conn, logger, prefix, nil} +} + +//NewLoggingConnFilter returns a logging wrapper around a connection and a filter function. +func NewLoggingConnFilter(conn Conn, logger *log.Logger, prefix string, skip func(cmdName string) bool) Conn { + if prefix != "" { + prefix = prefix + "." + } + return &loggingConn{conn, logger, prefix, skip} } type loggingConn struct { Conn logger *log.Logger prefix string + skip func(cmdName string) bool } func (c *loggingConn) Close() error { @@ -80,6 +94,9 @@ func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) { } func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) { + if c.skip != nil && c.skip(commandName) { + return + } var buf bytes.Buffer fmt.Fprintf(&buf, "%s%s(", c.prefix, method) if method != "Receive" { @@ -104,6 +121,12 @@ func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, return reply, err } +func (c *loggingConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) { + reply, err := DoWithTimeout(c.Conn, timeout, commandName, args...) + c.print("DoWithTimeout", commandName, args, reply, err) + return reply, err +} + func (c *loggingConn) Send(commandName string, args ...interface{}) error { err := c.Conn.Send(commandName, args...) c.print("Send", commandName, args, nil, err) @@ -115,3 +138,9 @@ func (c *loggingConn) Receive() (interface{}, error) { c.print("Receive", "", nil, reply, err) return reply, err } + +func (c *loggingConn) ReceiveWithTimeout(timeout time.Duration) (interface{}, error) { + reply, err := ReceiveWithTimeout(c.Conn, timeout) + c.print("ReceiveWithTimeout", "", nil, reply, err) + return reply, err +} diff --git a/vendor/github.com/gomodule/redigo/redis/pool.go b/vendor/github.com/gomodule/redigo/redis/pool.go new file mode 100644 index 000000000..fef6bae55 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pool.go @@ -0,0 +1,635 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha1" + "errors" + "io" + "strconv" + "sync" + "sync/atomic" + "time" +) + +var ( + _ ConnWithTimeout = (*activeConn)(nil) + _ ConnWithTimeout = (*errorConn)(nil) +) + +var nowFunc = time.Now // for testing + +// ErrPoolExhausted is returned from a pool connection method (Do, Send, +// Receive, Flush, Err) when the maximum number of database connections in the +// pool has been reached. +var ErrPoolExhausted = errors.New("redigo: connection pool exhausted") + +var ( + errPoolClosed = errors.New("redigo: connection pool closed") + errConnClosed = errors.New("redigo: connection closed") +) + +// Pool maintains a pool of connections. The application calls the Get method +// to get a connection from the pool and the connection's Close method to +// return the connection's resources to the pool. +// +// The following example shows how to use a pool in a web application. The +// application creates a pool at application startup and makes it available to +// request handlers using a package level variable. The pool configuration used +// here is an example, not a recommendation. +// +// func newPool(addr string) *redis.Pool { +// return &redis.Pool{ +// MaxIdle: 3, +// IdleTimeout: 240 * time.Second, +// // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial. +// Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) }, +// } +// } +// +// var ( +// pool *redis.Pool +// redisServer = flag.String("redisServer", ":6379", "") +// ) +// +// func main() { +// flag.Parse() +// pool = newPool(*redisServer) +// ... +// } +// +// A request handler gets a connection from the pool and closes the connection +// when the handler is done: +// +// func serveHome(w http.ResponseWriter, r *http.Request) { +// conn := pool.Get() +// defer conn.Close() +// ... +// } +// +// Use the Dial function to authenticate connections with the AUTH command or +// select a database with the SELECT command: +// +// pool := &redis.Pool{ +// // Other pool configuration not shown in this example. +// Dial: func () (redis.Conn, error) { +// c, err := redis.Dial("tcp", server) +// if err != nil { +// return nil, err +// } +// if _, err := c.Do("AUTH", password); err != nil { +// c.Close() +// return nil, err +// } +// if _, err := c.Do("SELECT", db); err != nil { +// c.Close() +// return nil, err +// } +// return c, nil +// }, +// } +// +// Use the TestOnBorrow function to check the health of an idle connection +// before the connection is returned to the application. This example PINGs +// connections that have been idle more than a minute: +// +// pool := &redis.Pool{ +// // Other pool configuration not shown in this example. +// TestOnBorrow: func(c redis.Conn, t time.Time) error { +// if time.Since(t) < time.Minute { +// return nil +// } +// _, err := c.Do("PING") +// return err +// }, +// } +// +type Pool struct { + // Dial is an application supplied function for creating and configuring a + // connection. + // + // The connection returned from Dial must not be in a special state + // (subscribed to pubsub channel, transaction started, ...). + Dial func() (Conn, error) + + // DialContext is an application supplied function for creating and configuring a + // connection with the given context. + // + // The connection returned from Dial must not be in a special state + // (subscribed to pubsub channel, transaction started, ...). + DialContext func(ctx context.Context) (Conn, error) + + // TestOnBorrow is an optional application supplied function for checking + // the health of an idle connection before the connection is used again by + // the application. Argument t is the time that the connection was returned + // to the pool. If the function returns an error, then the connection is + // closed. + TestOnBorrow func(c Conn, t time.Time) error + + // Maximum number of idle connections in the pool. + MaxIdle int + + // Maximum number of connections allocated by the pool at a given time. + // When zero, there is no limit on the number of connections in the pool. + MaxActive int + + // Close connections after remaining idle for this duration. If the value + // is zero, then idle connections are not closed. Applications should set + // the timeout to a value less than the server's timeout. + IdleTimeout time.Duration + + // If Wait is true and the pool is at the MaxActive limit, then Get() waits + // for a connection to be returned to the pool before returning. + Wait bool + + // Close connections older than this duration. If the value is zero, then + // the pool does not close connections based on age. + MaxConnLifetime time.Duration + + chInitialized uint32 // set to 1 when field ch is initialized + + mu sync.Mutex // mu protects the following fields + closed bool // set to true when the pool is closed. + active int // the number of open connections in the pool + ch chan struct{} // limits open connections when p.Wait is true + idle idleList // idle connections + waitCount int64 // total number of connections waited for. + waitDuration time.Duration // total time waited for new connections. +} + +// NewPool creates a new pool. +// +// Deprecated: Initialize the Pool directly as shown in the example. +func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { + return &Pool{Dial: newFn, MaxIdle: maxIdle} +} + +// Get gets a connection. The application must close the returned connection. +// This method always returns a valid connection so that applications can defer +// error handling to the first use of the connection. If there is an error +// getting an underlying connection, then the connection Err, Do, Send, Flush +// and Receive methods return that error. +func (p *Pool) Get() Conn { + // GetContext returns errorConn in the first argument when an error occurs. + c, _ := p.GetContext(context.Background()) + return c +} + +// GetContext gets a connection using the provided context. +// +// The provided Context must be non-nil. If the context expires before the +// connection is complete, an error is returned. Any expiration on the context +// will not affect the returned connection. +// +// If the function completes without error, then the application must close the +// returned connection. +func (p *Pool) GetContext(ctx context.Context) (Conn, error) { + // Wait until there is a vacant connection in the pool. + waited, err := p.waitVacantConn(ctx) + if err != nil { + return nil, err + } + + p.mu.Lock() + + if waited > 0 { + p.waitCount++ + p.waitDuration += waited + } + + // Prune stale connections at the back of the idle list. + if p.IdleTimeout > 0 { + n := p.idle.count + for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { + pc := p.idle.back + p.idle.popBack() + p.mu.Unlock() + pc.c.Close() + p.mu.Lock() + p.active-- + } + } + + // Get idle connection from the front of idle list. + for p.idle.front != nil { + pc := p.idle.front + p.idle.popFront() + p.mu.Unlock() + if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && + (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { + return &activeConn{p: p, pc: pc}, nil + } + pc.c.Close() + p.mu.Lock() + p.active-- + } + + // Check for pool closed before dialing a new connection. + if p.closed { + p.mu.Unlock() + err := errors.New("redigo: get on closed pool") + return errorConn{err}, err + } + + // Handle limit for p.Wait == false. + if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { + p.mu.Unlock() + return errorConn{ErrPoolExhausted}, ErrPoolExhausted + } + + p.active++ + p.mu.Unlock() + c, err := p.dial(ctx) + if err != nil { + c = nil + p.mu.Lock() + p.active-- + if p.ch != nil && !p.closed { + p.ch <- struct{}{} + } + p.mu.Unlock() + return errorConn{err}, err + } + return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil +} + +// PoolStats contains pool statistics. +type PoolStats struct { + // ActiveCount is the number of connections in the pool. The count includes + // idle connections and connections in use. + ActiveCount int + // IdleCount is the number of idle connections in the pool. + IdleCount int + + // WaitCount is the total number of connections waited for. + // This value is currently not guaranteed to be 100% accurate. + WaitCount int64 + + // WaitDuration is the total time blocked waiting for a new connection. + // This value is currently not guaranteed to be 100% accurate. + WaitDuration time.Duration +} + +// Stats returns pool's statistics. +func (p *Pool) Stats() PoolStats { + p.mu.Lock() + stats := PoolStats{ + ActiveCount: p.active, + IdleCount: p.idle.count, + WaitCount: p.waitCount, + WaitDuration: p.waitDuration, + } + p.mu.Unlock() + + return stats +} + +// ActiveCount returns the number of connections in the pool. The count +// includes idle connections and connections in use. +func (p *Pool) ActiveCount() int { + p.mu.Lock() + active := p.active + p.mu.Unlock() + return active +} + +// IdleCount returns the number of idle connections in the pool. +func (p *Pool) IdleCount() int { + p.mu.Lock() + idle := p.idle.count + p.mu.Unlock() + return idle +} + +// Close releases the resources used by the pool. +func (p *Pool) Close() error { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return nil + } + p.closed = true + p.active -= p.idle.count + pc := p.idle.front + p.idle.count = 0 + p.idle.front, p.idle.back = nil, nil + if p.ch != nil { + close(p.ch) + } + p.mu.Unlock() + for ; pc != nil; pc = pc.next { + pc.c.Close() + } + return nil +} + +func (p *Pool) lazyInit() { + // Fast path. + if atomic.LoadUint32(&p.chInitialized) == 1 { + return + } + // Slow path. + p.mu.Lock() + if p.chInitialized == 0 { + p.ch = make(chan struct{}, p.MaxActive) + if p.closed { + close(p.ch) + } else { + for i := 0; i < p.MaxActive; i++ { + p.ch <- struct{}{} + } + } + atomic.StoreUint32(&p.chInitialized, 1) + } + p.mu.Unlock() +} + +// waitVacantConn waits for a vacant connection in pool if waiting +// is enabled and pool size is limited, otherwise returns instantly. +// If ctx expires before that, an error is returned. +// +// If there were no vacant connection in the pool right away it returns the time spent waiting +// for that connection to appear in the pool. +func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) { + if !p.Wait || p.MaxActive <= 0 { + // No wait or no connection limit. + return 0, nil + } + + p.lazyInit() + + // wait indicates if we believe it will block so its not 100% accurate + // however for stats it should be good enough. + wait := len(p.ch) == 0 + var start time.Time + if wait { + start = time.Now() + } + + if ctx == nil { + <-p.ch + } else { + select { + case <-p.ch: + // Additionally check that context hasn't expired while we were waiting, + // because `select` picks a random `case` if several of them are "ready". + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + case <-ctx.Done(): + return 0, ctx.Err() + } + } + + if wait { + return time.Since(start), nil + } + return 0, nil +} + +func (p *Pool) dial(ctx context.Context) (Conn, error) { + if p.DialContext != nil { + return p.DialContext(ctx) + } + if p.Dial != nil { + return p.Dial() + } + return nil, errors.New("redigo: must pass Dial or DialContext to pool") +} + +func (p *Pool) put(pc *poolConn, forceClose bool) error { + p.mu.Lock() + if !p.closed && !forceClose { + pc.t = nowFunc() + p.idle.pushFront(pc) + if p.idle.count > p.MaxIdle { + pc = p.idle.back + p.idle.popBack() + } else { + pc = nil + } + } + + if pc != nil { + p.mu.Unlock() + pc.c.Close() + p.mu.Lock() + p.active-- + } + + if p.ch != nil && !p.closed { + p.ch <- struct{}{} + } + p.mu.Unlock() + return nil +} + +type activeConn struct { + p *Pool + pc *poolConn + state int +} + +var ( + sentinel []byte + sentinelOnce sync.Once +) + +func initSentinel() { + p := make([]byte, 64) + if _, err := rand.Read(p); err == nil { + sentinel = p + } else { + h := sha1.New() + io.WriteString(h, "Oops, rand failed. Use time instead.") + io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) + sentinel = h.Sum(nil) + } +} + +func (ac *activeConn) Close() error { + pc := ac.pc + if pc == nil { + return nil + } + ac.pc = nil + + if ac.state&connectionMultiState != 0 { + pc.c.Send("DISCARD") + ac.state &^= (connectionMultiState | connectionWatchState) + } else if ac.state&connectionWatchState != 0 { + pc.c.Send("UNWATCH") + ac.state &^= connectionWatchState + } + if ac.state&connectionSubscribeState != 0 { + pc.c.Send("UNSUBSCRIBE") + pc.c.Send("PUNSUBSCRIBE") + // To detect the end of the message stream, ask the server to echo + // a sentinel value and read until we see that value. + sentinelOnce.Do(initSentinel) + pc.c.Send("ECHO", sentinel) + pc.c.Flush() + for { + p, err := pc.c.Receive() + if err != nil { + break + } + if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { + ac.state &^= connectionSubscribeState + break + } + } + } + pc.c.Do("") + ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) + return nil +} + +func (ac *activeConn) Err() error { + pc := ac.pc + if pc == nil { + return errConnClosed + } + return pc.c.Err() +} + +func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } + ci := lookupCommandInfo(commandName) + ac.state = (ac.state | ci.Set) &^ ci.Clear + return pc.c.Do(commandName, args...) +} + +func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } + cwt, ok := pc.c.(ConnWithTimeout) + if !ok { + return nil, errTimeoutNotSupported + } + ci := lookupCommandInfo(commandName) + ac.state = (ac.state | ci.Set) &^ ci.Clear + return cwt.DoWithTimeout(timeout, commandName, args...) +} + +func (ac *activeConn) Send(commandName string, args ...interface{}) error { + pc := ac.pc + if pc == nil { + return errConnClosed + } + ci := lookupCommandInfo(commandName) + ac.state = (ac.state | ci.Set) &^ ci.Clear + return pc.c.Send(commandName, args...) +} + +func (ac *activeConn) Flush() error { + pc := ac.pc + if pc == nil { + return errConnClosed + } + return pc.c.Flush() +} + +func (ac *activeConn) Receive() (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } + return pc.c.Receive() +} + +func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } + cwt, ok := pc.c.(ConnWithTimeout) + if !ok { + return nil, errTimeoutNotSupported + } + return cwt.ReceiveWithTimeout(timeout) +} + +type errorConn struct{ err error } + +func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } +func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) { + return nil, ec.err +} +func (ec errorConn) Send(string, ...interface{}) error { return ec.err } +func (ec errorConn) Err() error { return ec.err } +func (ec errorConn) Close() error { return nil } +func (ec errorConn) Flush() error { return ec.err } +func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err } +func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err } + +type idleList struct { + count int + front, back *poolConn +} + +type poolConn struct { + c Conn + t time.Time + created time.Time + next, prev *poolConn +} + +func (l *idleList) pushFront(pc *poolConn) { + pc.next = l.front + pc.prev = nil + if l.count == 0 { + l.back = pc + } else { + l.front.prev = pc + } + l.front = pc + l.count++ + return +} + +func (l *idleList) popFront() { + pc := l.front + l.count-- + if l.count == 0 { + l.front, l.back = nil, nil + } else { + pc.next.prev = nil + l.front = pc.next + } + pc.next, pc.prev = nil, nil +} + +func (l *idleList) popBack() { + pc := l.back + l.count-- + if l.count == 0 { + l.front, l.back = nil, nil + } else { + pc.prev.next = nil + l.back = pc.prev + } + pc.next, pc.prev = nil, nil +} diff --git a/vendor/github.com/garyburd/redigo/redis/pubsub.go b/vendor/github.com/gomodule/redigo/redis/pubsub.go similarity index 72% rename from vendor/github.com/garyburd/redigo/redis/pubsub.go rename to vendor/github.com/gomodule/redigo/redis/pubsub.go index f0790429f..2da60211d 100644 --- a/vendor/github.com/garyburd/redigo/redis/pubsub.go +++ b/vendor/github.com/gomodule/redigo/redis/pubsub.go @@ -16,11 +16,11 @@ package redis import ( "errors" + "time" ) // Subscription represents a subscribe or unsubscribe notification. type Subscription struct { - // Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" Kind string @@ -33,25 +33,19 @@ type Subscription struct { // Message represents a message notification. type Message struct { - // The originating channel. Channel string + // The matched pattern, if any + Pattern string + // The message data. Data []byte } -// PMessage represents a pmessage notification. -type PMessage struct { - - // The matched pattern. - Pattern string - - // The originating channel. - Channel string - - // The message data. - Data []byte +// Pong represents a pubsub pong notification. +type Pong struct { + Data string } // PubSubConn wraps a Conn with convenience methods for subscribers. @@ -90,11 +84,30 @@ func (c PubSubConn) PUnsubscribe(channel ...interface{}) error { return c.Conn.Flush() } -// Receive returns a pushed message as a Subscription, Message, PMessage or -// error. The return value is intended to be used directly in a type switch as +// Ping sends a PING to the server with the specified data. +// +// The connection must be subscribed to at least one channel or pattern when +// calling this method. +func (c PubSubConn) Ping(data string) error { + c.Conn.Send("PING", data) + return c.Conn.Flush() +} + +// Receive returns a pushed message as a Subscription, Message, Pong or error. +// The return value is intended to be used directly in a type switch as // illustrated in the PubSubConn example. func (c PubSubConn) Receive() interface{} { - reply, err := Values(c.Conn.Receive()) + return c.receiveInternal(c.Conn.Receive()) +} + +// ReceiveWithTimeout is like Receive, but it allows the application to +// override the connection's default timeout. +func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} { + return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout)) +} + +func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} { + reply, err := Values(replyArg, errArg) if err != nil { return err } @@ -113,17 +126,23 @@ func (c PubSubConn) Receive() interface{} { } return m case "pmessage": - var pm PMessage - if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil { + var m Message + if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil { return err } - return pm + return m case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": s := Subscription{Kind: kind} if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { return err } return s + case "pong": + var p Pong + if _, err := Scan(reply, &p.Data); err != nil { + return err + } + return p } return errors.New("redigo: unknown pubsub notification") } diff --git a/vendor/github.com/gomodule/redigo/redis/redis.go b/vendor/github.com/gomodule/redigo/redis/redis.go new file mode 100644 index 000000000..e44648744 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/redis.go @@ -0,0 +1,138 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "errors" + "time" +) + +// Error represents an error returned in a command reply. +type Error string + +func (err Error) Error() string { return string(err) } + +// Conn represents a connection to a Redis server. +type Conn interface { + // Close closes the connection. + Close() error + + // Err returns a non-nil value when the connection is not usable. + Err() error + + // Do sends a command to the server and returns the received reply. + Do(commandName string, args ...interface{}) (reply interface{}, err error) + + // Send writes the command to the client's output buffer. + Send(commandName string, args ...interface{}) error + + // Flush flushes the output buffer to the Redis server. + Flush() error + + // Receive receives a single reply from the Redis server + Receive() (reply interface{}, err error) +} + +// Argument is the interface implemented by an object which wants to control how +// the object is converted to Redis bulk strings. +type Argument interface { + // RedisArg returns a value to be encoded as a bulk string per the + // conversions listed in the section 'Executing Commands'. + // Implementations should typically return a []byte or string. + RedisArg() interface{} +} + +// Scanner is implemented by an object which wants to control its value is +// interpreted when read from Redis. +type Scanner interface { + // RedisScan assigns a value from a Redis value. The argument src is one of + // the reply types listed in the section `Executing Commands`. + // + // An error should be returned if the value cannot be stored without + // loss of information. + RedisScan(src interface{}) error +} + +// ConnWithTimeout is an optional interface that allows the caller to override +// a connection's default read timeout. This interface is useful for executing +// the BLPOP, BRPOP, BRPOPLPUSH, XREAD and other commands that block at the +// server. +// +// A connection's default read timeout is set with the DialReadTimeout dial +// option. Applications should rely on the default timeout for commands that do +// not block at the server. +// +// All of the Conn implementations in this package satisfy the ConnWithTimeout +// interface. +// +// Use the DoWithTimeout and ReceiveWithTimeout helper functions to simplify +// use of this interface. +type ConnWithTimeout interface { + Conn + + // Do sends a command to the server and returns the received reply. + // The timeout overrides the read timeout set when dialing the + // connection. + DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) + + // Receive receives a single reply from the Redis server. The timeout + // overrides the read timeout set when dialing the connection. + ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) +} + +var errTimeoutNotSupported = errors.New("redis: connection does not support ConnWithTimeout") + +// DoWithTimeout executes a Redis command with the specified read timeout. If +// the connection does not satisfy the ConnWithTimeout interface, then an error +// is returned. +func DoWithTimeout(c Conn, timeout time.Duration, cmd string, args ...interface{}) (interface{}, error) { + cwt, ok := c.(ConnWithTimeout) + if !ok { + return nil, errTimeoutNotSupported + } + return cwt.DoWithTimeout(timeout, cmd, args...) +} + +// ReceiveWithTimeout receives a reply with the specified read timeout. If the +// connection does not satisfy the ConnWithTimeout interface, then an error is +// returned. +func ReceiveWithTimeout(c Conn, timeout time.Duration) (interface{}, error) { + cwt, ok := c.(ConnWithTimeout) + if !ok { + return nil, errTimeoutNotSupported + } + return cwt.ReceiveWithTimeout(timeout) +} + +// SlowLog represents a redis SlowLog +type SlowLog struct { + // ID is a unique progressive identifier for every slow log entry. + ID int64 + + // Time is the unix timestamp at which the logged command was processed. + Time time.Time + + // ExecutationTime is the amount of time needed for the command execution. + ExecutionTime time.Duration + + // Args is the command name and arguments + Args []string + + // ClientAddr is the client IP address (4.0 only). + ClientAddr string + + // ClientName is the name set via the CLIENT SETNAME command (4.0 only). + ClientName string +} diff --git a/vendor/github.com/gomodule/redigo/redis/reply.go b/vendor/github.com/gomodule/redigo/redis/reply.go new file mode 100644 index 000000000..251a78846 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/reply.go @@ -0,0 +1,583 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "errors" + "fmt" + "strconv" + "time" +) + +// ErrNil indicates that a reply value is nil. +var ErrNil = errors.New("redigo: nil returned") + +// Int is a helper that converts a command reply to an integer. If err is not +// equal to nil, then Int returns 0, err. Otherwise, Int converts the +// reply to an int as follows: +// +// Reply type Result +// integer int(reply), nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Int(reply interface{}, err error) (int, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + x := int(reply) + if int64(x) != reply { + return 0, strconv.ErrRange + } + return x, nil + case []byte: + n, err := strconv.ParseInt(string(reply), 10, 0) + return int(n), err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply) +} + +// Int64 is a helper that converts a command reply to 64 bit integer. If err is +// not equal to nil, then Int64 returns 0, err. Otherwise, Int64 converts the +// reply to an int64 as follows: +// +// Reply type Result +// integer reply, nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Int64(reply interface{}, err error) (int64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + return reply, nil + case []byte: + n, err := strconv.ParseInt(string(reply), 10, 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply) +} + +func errNegativeInt(v int64) error { + return fmt.Errorf("redigo: unexpected negative value %v for Uint64", v) +} + +// Uint64 is a helper that converts a command reply to 64 bit unsigned integer. +// If err is not equal to nil, then Uint64 returns 0, err. Otherwise, Uint64 converts the +// reply to an uint64 as follows: +// +// Reply type Result +// +integer reply, nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Uint64(reply interface{}, err error) (uint64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + if reply < 0 { + return 0, errNegativeInt(reply) + } + return uint64(reply), nil + case []byte: + n, err := strconv.ParseUint(string(reply), 10, 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Uint64, got type %T", reply) +} + +// Float64 is a helper that converts a command reply to 64 bit float. If err is +// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts +// the reply to an int as follows: +// +// Reply type Result +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Float64(reply interface{}, err error) (float64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case []byte: + n, err := strconv.ParseFloat(string(reply), 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply) +} + +// String is a helper that converts a command reply to a string. If err is not +// equal to nil, then String returns "", err. Otherwise String converts the +// reply to a string as follows: +// +// Reply type Result +// bulk string string(reply), nil +// simple string reply, nil +// nil "", ErrNil +// other "", error +func String(reply interface{}, err error) (string, error) { + if err != nil { + return "", err + } + switch reply := reply.(type) { + case []byte: + return string(reply), nil + case string: + return reply, nil + case nil: + return "", ErrNil + case Error: + return "", reply + } + return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply) +} + +// Bytes is a helper that converts a command reply to a slice of bytes. If err +// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts +// the reply to a slice of bytes as follows: +// +// Reply type Result +// bulk string reply, nil +// simple string []byte(reply), nil +// nil nil, ErrNil +// other nil, error +func Bytes(reply interface{}, err error) ([]byte, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []byte: + return reply, nil + case string: + return []byte(reply), nil + case nil: + return nil, ErrNil + case Error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply) +} + +// Bool is a helper that converts a command reply to a boolean. If err is not +// equal to nil, then Bool returns false, err. Otherwise Bool converts the +// reply to boolean as follows: +// +// Reply type Result +// integer value != 0, nil +// bulk string strconv.ParseBool(reply) +// nil false, ErrNil +// other false, error +func Bool(reply interface{}, err error) (bool, error) { + if err != nil { + return false, err + } + switch reply := reply.(type) { + case int64: + return reply != 0, nil + case []byte: + return strconv.ParseBool(string(reply)) + case nil: + return false, ErrNil + case Error: + return false, reply + } + return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply) +} + +// MultiBulk is a helper that converts an array command reply to a []interface{}. +// +// Deprecated: Use Values instead. +func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) } + +// Values is a helper that converts an array command reply to a []interface{}. +// If err is not equal to nil, then Values returns nil, err. Otherwise, Values +// converts the reply as follows: +// +// Reply type Result +// array reply, nil +// nil nil, ErrNil +// other nil, error +func Values(reply interface{}, err error) ([]interface{}, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []interface{}: + return reply, nil + case nil: + return nil, ErrNil + case Error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply) +} + +func sliceHelper(reply interface{}, err error, name string, makeSlice func(int), assign func(int, interface{}) error) error { + if err != nil { + return err + } + switch reply := reply.(type) { + case []interface{}: + makeSlice(len(reply)) + for i := range reply { + if reply[i] == nil { + continue + } + if err := assign(i, reply[i]); err != nil { + return err + } + } + return nil + case nil: + return ErrNil + case Error: + return reply + } + return fmt.Errorf("redigo: unexpected type for %s, got type %T", name, reply) +} + +// Float64s is a helper that converts an array command reply to a []float64. If +// err is not equal to nil, then Float64s returns nil, err. Nil array items are +// converted to 0 in the output slice. Floats64 returns an error if an array +// item is not a bulk string or nil. +func Float64s(reply interface{}, err error) ([]float64, error) { + var result []float64 + err = sliceHelper(reply, err, "Float64s", func(n int) { result = make([]float64, n) }, func(i int, v interface{}) error { + p, ok := v.([]byte) + if !ok { + return fmt.Errorf("redigo: unexpected element type for Floats64, got type %T", v) + } + f, err := strconv.ParseFloat(string(p), 64) + result[i] = f + return err + }) + return result, err +} + +// Strings is a helper that converts an array command reply to a []string. If +// err is not equal to nil, then Strings returns nil, err. Nil array items are +// converted to "" in the output slice. Strings returns an error if an array +// item is not a bulk string or nil. +func Strings(reply interface{}, err error) ([]string, error) { + var result []string + err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error { + switch v := v.(type) { + case string: + result[i] = v + return nil + case []byte: + result[i] = string(v) + return nil + default: + return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v) + } + }) + return result, err +} + +// ByteSlices is a helper that converts an array command reply to a [][]byte. +// If err is not equal to nil, then ByteSlices returns nil, err. Nil array +// items are stay nil. ByteSlices returns an error if an array item is not a +// bulk string or nil. +func ByteSlices(reply interface{}, err error) ([][]byte, error) { + var result [][]byte + err = sliceHelper(reply, err, "ByteSlices", func(n int) { result = make([][]byte, n) }, func(i int, v interface{}) error { + p, ok := v.([]byte) + if !ok { + return fmt.Errorf("redigo: unexpected element type for ByteSlices, got type %T", v) + } + result[i] = p + return nil + }) + return result, err +} + +// Int64s is a helper that converts an array command reply to a []int64. +// If err is not equal to nil, then Int64s returns nil, err. Nil array +// items are stay nil. Int64s returns an error if an array item is not a +// bulk string or nil. +func Int64s(reply interface{}, err error) ([]int64, error) { + var result []int64 + err = sliceHelper(reply, err, "Int64s", func(n int) { result = make([]int64, n) }, func(i int, v interface{}) error { + switch v := v.(type) { + case int64: + result[i] = v + return nil + case []byte: + n, err := strconv.ParseInt(string(v), 10, 64) + result[i] = n + return err + default: + return fmt.Errorf("redigo: unexpected element type for Int64s, got type %T", v) + } + }) + return result, err +} + +// Ints is a helper that converts an array command reply to a []in. +// If err is not equal to nil, then Ints returns nil, err. Nil array +// items are stay nil. Ints returns an error if an array item is not a +// bulk string or nil. +func Ints(reply interface{}, err error) ([]int, error) { + var result []int + err = sliceHelper(reply, err, "Ints", func(n int) { result = make([]int, n) }, func(i int, v interface{}) error { + switch v := v.(type) { + case int64: + n := int(v) + if int64(n) != v { + return strconv.ErrRange + } + result[i] = n + return nil + case []byte: + n, err := strconv.Atoi(string(v)) + result[i] = n + return err + default: + return fmt.Errorf("redigo: unexpected element type for Ints, got type %T", v) + } + }) + return result, err +} + +// StringMap is a helper that converts an array of strings (alternating key, value) +// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format. +// Requires an even number of values in result. +func StringMap(result interface{}, err error) (map[string]string, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + if len(values)%2 != 0 { + return nil, errors.New("redigo: StringMap expects even number of values result") + } + m := make(map[string]string, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, okKey := values[i].([]byte) + value, okValue := values[i+1].([]byte) + if !okKey || !okValue { + return nil, errors.New("redigo: StringMap key not a bulk string value") + } + m[string(key)] = string(value) + } + return m, nil +} + +// IntMap is a helper that converts an array of strings (alternating key, value) +// into a map[string]int. The HGETALL commands return replies in this format. +// Requires an even number of values in result. +func IntMap(result interface{}, err error) (map[string]int, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + if len(values)%2 != 0 { + return nil, errors.New("redigo: IntMap expects even number of values result") + } + m := make(map[string]int, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, ok := values[i].([]byte) + if !ok { + return nil, errors.New("redigo: IntMap key not a bulk string value") + } + value, err := Int(values[i+1], nil) + if err != nil { + return nil, err + } + m[string(key)] = value + } + return m, nil +} + +// Int64Map is a helper that converts an array of strings (alternating key, value) +// into a map[string]int64. The HGETALL commands return replies in this format. +// Requires an even number of values in result. +func Int64Map(result interface{}, err error) (map[string]int64, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + if len(values)%2 != 0 { + return nil, errors.New("redigo: Int64Map expects even number of values result") + } + m := make(map[string]int64, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, ok := values[i].([]byte) + if !ok { + return nil, errors.New("redigo: Int64Map key not a bulk string value") + } + value, err := Int64(values[i+1], nil) + if err != nil { + return nil, err + } + m[string(key)] = value + } + return m, nil +} + +// Positions is a helper that converts an array of positions (lat, long) +// into a [][2]float64. The GEOPOS command returns replies in this format. +func Positions(result interface{}, err error) ([]*[2]float64, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + positions := make([]*[2]float64, len(values)) + for i := range values { + if values[i] == nil { + continue + } + p, ok := values[i].([]interface{}) + if !ok { + return nil, fmt.Errorf("redigo: unexpected element type for interface slice, got type %T", values[i]) + } + if len(p) != 2 { + return nil, fmt.Errorf("redigo: unexpected number of values for a member position, got %d", len(p)) + } + lat, err := Float64(p[0], nil) + if err != nil { + return nil, err + } + long, err := Float64(p[1], nil) + if err != nil { + return nil, err + } + positions[i] = &[2]float64{lat, long} + } + return positions, nil +} + +// Uint64s is a helper that converts an array command reply to a []uint64. +// If err is not equal to nil, then Uint64s returns nil, err. Nil array +// items are stay nil. Uint64s returns an error if an array item is not a +// bulk string or nil. +func Uint64s(reply interface{}, err error) ([]uint64, error) { + var result []uint64 + err = sliceHelper(reply, err, "Uint64s", func(n int) { result = make([]uint64, n) }, func(i int, v interface{}) error { + switch v := v.(type) { + case uint64: + result[i] = v + return nil + case []byte: + n, err := strconv.ParseUint(string(v), 10, 64) + result[i] = n + return err + default: + return fmt.Errorf("redigo: unexpected element type for Uint64s, got type %T", v) + } + }) + return result, err +} + +// Uint64Map is a helper that converts an array of strings (alternating key, value) +// into a map[string]uint64. The HGETALL commands return replies in this format. +// Requires an even number of values in result. +func Uint64Map(result interface{}, err error) (map[string]uint64, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + if len(values)%2 != 0 { + return nil, errors.New("redigo: Uint64Map expects even number of values result") + } + m := make(map[string]uint64, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, ok := values[i].([]byte) + if !ok { + return nil, errors.New("redigo: Uint64Map key not a bulk string value") + } + value, err := Uint64(values[i+1], nil) + if err != nil { + return nil, err + } + m[string(key)] = value + } + return m, nil +} + +// SlowLogs is a helper that parse the SLOWLOG GET command output and +// return the array of SlowLog +func SlowLogs(result interface{}, err error) ([]SlowLog, error) { + rawLogs, err := Values(result, err) + if err != nil { + return nil, err + } + logs := make([]SlowLog, len(rawLogs)) + for i, rawLog := range rawLogs { + rawLog, ok := rawLog.([]interface{}) + if !ok { + return nil, errors.New("redigo: slowlog element is not an array") + } + + var log SlowLog + + if len(rawLog) < 4 { + return nil, errors.New("redigo: slowlog element has less than four elements") + } + log.ID, ok = rawLog[0].(int64) + if !ok { + return nil, errors.New("redigo: slowlog element[0] not an int64") + } + timestamp, ok := rawLog[1].(int64) + if !ok { + return nil, errors.New("redigo: slowlog element[1] not an int64") + } + log.Time = time.Unix(timestamp, 0) + duration, ok := rawLog[2].(int64) + if !ok { + return nil, errors.New("redigo: slowlog element[2] not an int64") + } + log.ExecutionTime = time.Duration(duration) * time.Microsecond + + log.Args, err = Strings(rawLog[3], nil) + if err != nil { + return nil, fmt.Errorf("redigo: slowlog element[3] is not array of string. actual error is : %s", err.Error()) + } + if len(rawLog) >= 6 { + log.ClientAddr, err = String(rawLog[4], nil) + if err != nil { + return nil, fmt.Errorf("redigo: slowlog element[4] is not a string. actual error is : %s", err.Error()) + } + log.ClientName, err = String(rawLog[5], nil) + if err != nil { + return nil, fmt.Errorf("redigo: slowlog element[5] is not a string. actual error is : %s", err.Error()) + } + } + logs[i] = log + } + return logs, nil +} diff --git a/vendor/github.com/garyburd/redigo/redis/scan.go b/vendor/github.com/gomodule/redigo/redis/scan.go similarity index 64% rename from vendor/github.com/garyburd/redigo/redis/scan.go rename to vendor/github.com/gomodule/redigo/redis/scan.go index 8c9cfa18d..93d0c658c 100644 --- a/vendor/github.com/garyburd/redigo/redis/scan.go +++ b/vendor/github.com/gomodule/redigo/redis/scan.go @@ -23,6 +23,10 @@ import ( "sync" ) +var ( + scannerType = reflect.TypeOf((*Scanner)(nil)).Elem() +) + func ensureLen(d reflect.Value, n int) { if n > d.Cap() { d.Set(reflect.MakeSlice(d.Type(), n, n)) @@ -32,42 +36,117 @@ func ensureLen(d reflect.Value, n int) { } func cannotConvert(d reflect.Value, s interface{}) error { - return fmt.Errorf("redigo: Scan cannot convert from %s to %s", - reflect.TypeOf(s), d.Type()) + var sname string + switch s.(type) { + case string: + sname = "Redis simple string" + case Error: + sname = "Redis error" + case int64: + sname = "Redis integer" + case []byte: + sname = "Redis bulk string" + case []interface{}: + sname = "Redis array" + case nil: + sname = "Redis nil" + default: + sname = reflect.TypeOf(s).String() + } + return fmt.Errorf("cannot convert from %s to %s", sname, d.Type()) } -func convertAssignBytes(d reflect.Value, s []byte) (err error) { +func convertAssignNil(d reflect.Value) (err error) { + switch d.Type().Kind() { + case reflect.Slice, reflect.Interface: + d.Set(reflect.Zero(d.Type())) + default: + err = cannotConvert(d, nil) + } + return err +} + +func convertAssignError(d reflect.Value, s Error) (err error) { + if d.Kind() == reflect.String { + d.SetString(string(s)) + } else if d.Kind() == reflect.Slice && d.Type().Elem().Kind() == reflect.Uint8 { + d.SetBytes([]byte(s)) + } else { + err = cannotConvert(d, s) + } + return +} + +func convertAssignString(d reflect.Value, s string) (err error) { switch d.Type().Kind() { case reflect.Float32, reflect.Float64: var x float64 - x, err = strconv.ParseFloat(string(s), d.Type().Bits()) + x, err = strconv.ParseFloat(s, d.Type().Bits()) d.SetFloat(x) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: var x int64 - x, err = strconv.ParseInt(string(s), 10, d.Type().Bits()) + x, err = strconv.ParseInt(s, 10, d.Type().Bits()) d.SetInt(x) case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: var x uint64 - x, err = strconv.ParseUint(string(s), 10, d.Type().Bits()) + x, err = strconv.ParseUint(s, 10, d.Type().Bits()) d.SetUint(x) case reflect.Bool: var x bool - x, err = strconv.ParseBool(string(s)) + x, err = strconv.ParseBool(s) d.SetBool(x) case reflect.String: - d.SetString(string(s)) + d.SetString(s) case reflect.Slice: - if d.Type().Elem().Kind() != reflect.Uint8 { - err = cannotConvert(d, s) + if d.Type().Elem().Kind() == reflect.Uint8 { + d.SetBytes([]byte(s)) } else { - d.SetBytes(s) + err = cannotConvert(d, s) } + case reflect.Ptr: + err = convertAssignString(d.Elem(), s) default: err = cannotConvert(d, s) } return } +func convertAssignBulkString(d reflect.Value, s []byte) (err error) { + switch d.Type().Kind() { + case reflect.Slice: + // Handle []byte destination here to avoid unnecessary + // []byte -> string -> []byte converion. + if d.Type().Elem().Kind() == reflect.Uint8 { + d.SetBytes(s) + } else { + err = cannotConvert(d, s) + } + case reflect.Ptr: + if d.CanInterface() && d.CanSet() { + if s == nil { + if d.IsNil() { + return nil + } + + d.Set(reflect.Zero(d.Type())) + return nil + } + + if d.IsNil() { + d.Set(reflect.New(d.Type().Elem())) + } + + if sc, ok := d.Interface().(Scanner); ok { + return sc.RedisScan(s) + } + } + err = convertAssignString(d, string(s)) + default: + err = convertAssignString(d, string(s)) + } + return err +} + func convertAssignInt(d reflect.Value, s int64) (err error) { switch d.Type().Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: @@ -96,18 +175,43 @@ func convertAssignInt(d reflect.Value, s int64) (err error) { } func convertAssignValue(d reflect.Value, s interface{}) (err error) { + if d.Kind() != reflect.Ptr { + if d.CanAddr() { + d2 := d.Addr() + if d2.CanInterface() { + if scanner, ok := d2.Interface().(Scanner); ok { + return scanner.RedisScan(s) + } + } + } + } else if d.CanInterface() { + // Already a reflect.Ptr + if d.IsNil() { + d.Set(reflect.New(d.Type().Elem())) + } + if scanner, ok := d.Interface().(Scanner); ok { + return scanner.RedisScan(s) + } + } + switch s := s.(type) { + case nil: + err = convertAssignNil(d) case []byte: - err = convertAssignBytes(d, s) + err = convertAssignBulkString(d, s) case int64: err = convertAssignInt(d, s) + case string: + err = convertAssignString(d, s) + case Error: + err = convertAssignError(d, s) default: err = cannotConvert(d, s) } return err } -func convertAssignValues(d reflect.Value, s []interface{}) error { +func convertAssignArray(d reflect.Value, s []interface{}) error { if d.Type().Kind() != reflect.Slice { return cannotConvert(d, s) } @@ -121,11 +225,15 @@ func convertAssignValues(d reflect.Value, s []interface{}) error { } func convertAssign(d interface{}, s interface{}) (err error) { + if scanner, ok := d.(Scanner); ok { + return scanner.RedisScan(s) + } + // Handle the most common destination types using type switches and // fall back to reflection for all other types. switch s := s.(type) { case nil: - // ingore + // ignore case []byte: switch d := d.(type) { case *string: @@ -144,7 +252,7 @@ func convertAssign(d interface{}, s interface{}) (err error) { if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { err = cannotConvert(d, s) } else { - err = convertAssignBytes(d.Elem(), s) + err = convertAssignBulkString(d.Elem(), s) } } case int64: @@ -169,6 +277,17 @@ func convertAssign(d interface{}, s interface{}) (err error) { err = convertAssignInt(d.Elem(), s) } } + case string: + switch d := d.(type) { + case *string: + *d = s + case *interface{}: + *d = s + case nil: + // skip value + default: + err = cannotConvert(reflect.ValueOf(d), s) + } case []interface{}: switch d := d.(type) { case *[]interface{}: @@ -181,7 +300,7 @@ func convertAssign(d interface{}, s interface{}) (err error) { if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { err = cannotConvert(d, s) } else { - err = convertAssignValues(d.Elem(), s) + err = convertAssignArray(d.Elem(), s) } } case Error: @@ -194,6 +313,8 @@ func convertAssign(d interface{}, s interface{}) (err error) { // Scan copies from src to the values pointed at by dest. // +// Scan uses RedisScan if available otherwise: +// // The values pointed at by dest must be an integer, float, boolean, string, // []byte, interface{} or slices of these types. Scan uses the standard strconv // package to convert bulk strings to numeric and boolean types. @@ -206,12 +327,13 @@ func convertAssign(d interface{}, s interface{}) (err error) { // following the copied values. func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) { if len(src) < len(dest) { - return nil, errors.New("redigo: Scan array short") + return nil, errors.New("redigo.Scan: array short") } var err error for i, d := range dest { err = convertAssign(d, src[i]) if err != nil { + err = fmt.Errorf("redigo.Scan: cannot assign to dest %d: %v", i, err) break } } @@ -219,9 +341,9 @@ func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) { } type fieldSpec struct { - name string - index []int - //omitEmpty bool + name string + index []int + omitEmpty bool } type structSpec struct { @@ -237,13 +359,17 @@ func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *st for i := 0; i < t.NumField(); i++ { f := t.Field(i) switch { - case f.PkgPath != "": + case f.PkgPath != "" && !f.Anonymous: // Ignore unexported fields. case f.Anonymous: - // TODO: Handle pointers. Requires change to decoder and - // protection against infinite recursion. - if f.Type.Kind() == reflect.Struct { + switch f.Type.Kind() { + case reflect.Struct: compileStructSpec(f.Type, depth, append(index, i), ss) + case reflect.Ptr: + // TODO(steve): Protect against infinite recursion. + if f.Type.Elem().Kind() == reflect.Struct { + compileStructSpec(f.Type.Elem(), depth, append(index, i), ss) + } } default: fs := &fieldSpec{name: f.Name} @@ -258,10 +384,10 @@ func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *st } for _, s := range p[1:] { switch s { - //case "omitempty": - // fs.omitempty = true + case "omitempty": + fs.omitEmpty = true default: - panic(errors.New("redigo: unknown field flag " + s + " for type " + t.Name())) + panic(fmt.Errorf("redigo: unknown field tag %s for type %s", s, t.Name())) } } } @@ -321,7 +447,7 @@ func structSpecForType(t reflect.Type) *structSpec { return ss } -var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil pointer to a struct") +var errScanStructValue = errors.New("redigo.ScanStruct: value must be non-nil pointer to a struct") // ScanStruct scans alternating names and values from src to a struct. The // HGETALL and CONFIG GET commands return replies in this format. @@ -333,6 +459,7 @@ var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil po // // Fields with the tag redis:"-" are ignored. // +// Each field uses RedisScan if available otherwise: // Integer, float, boolean, string and []byte fields are supported. Scan uses the // standard strconv package to convert bulk string values to numeric and // boolean types. @@ -350,7 +477,7 @@ func ScanStruct(src []interface{}, dest interface{}) error { ss := structSpecForType(d.Type()) if len(src)%2 != 0 { - return errors.New("redigo: ScanStruct expects even number of values in values") + return errors.New("redigo.ScanStruct: number of values not a multiple of 2") } for i := 0; i < len(src); i += 2 { @@ -360,26 +487,30 @@ func ScanStruct(src []interface{}, dest interface{}) error { } name, ok := src[i].([]byte) if !ok { - return errors.New("redigo: ScanStruct key not a bulk string value") + return fmt.Errorf("redigo.ScanStruct: key %d not a bulk string value", i) } fs := ss.fieldSpec(name) if fs == nil { continue } if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { - return err + return fmt.Errorf("redigo.ScanStruct: cannot assign field %s: %v", fs.name, err) } } return nil } var ( - errScanSliceValue = errors.New("redigo: ScanSlice dest must be non-nil pointer to a struct") + errScanSliceValue = errors.New("redigo.ScanSlice: dest must be non-nil pointer to a struct") ) -// ScanSlice scans src to the slice pointed to by dest. The elements the dest -// slice must be integer, float, boolean, string, struct or pointer to struct -// values. +// ScanSlice scans src to the slice pointed to by dest. +// +// If the target is a slice of types which implement Scanner then the custom +// RedisScan method is used otherwise the following rules apply: +// +// The elements in the dest slice must be integer, float, boolean, string, struct +// or pointer to struct values. // // Struct fields must be integer, float, boolean or string values. All struct // fields are used unless a subset is specified using fieldNames. @@ -395,19 +526,20 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error isPtr := false t := d.Type().Elem() + st := t if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct { isPtr = true t = t.Elem() } - if t.Kind() != reflect.Struct { + if t.Kind() != reflect.Struct || st.Implements(scannerType) { ensureLen(d, len(src)) for i, s := range src { if s == nil { continue } if err := convertAssignValue(d.Index(i), s); err != nil { - return err + return fmt.Errorf("redigo.ScanSlice: cannot assign element %d: %v", i, err) } } return nil @@ -420,18 +552,18 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error for i, name := range fieldNames { fss[i] = ss.m[name] if fss[i] == nil { - return errors.New("redigo: ScanSlice bad field name " + name) + return fmt.Errorf("redigo.ScanSlice: ScanSlice bad field name %s", name) } } } if len(fss) == 0 { - return errors.New("redigo: ScanSlice no struct fields") + return errors.New("redigo.ScanSlice: no struct fields") } n := len(src) / len(fss) if n*len(fss) != len(src) { - return errors.New("redigo: ScanSlice length not a multiple of struct field count") + return errors.New("redigo.ScanSlice: length not a multiple of struct field count") } ensureLen(d, n) @@ -449,7 +581,7 @@ func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error continue } if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { - return err + return fmt.Errorf("redigo.ScanSlice: cannot assign element %d to field %s: %v", i*len(fss)+j, fs.name, err) } } } @@ -507,7 +639,35 @@ func flattenStruct(args Args, v reflect.Value) Args { ss := structSpecForType(v.Type()) for _, fs := range ss.l { fv := v.FieldByIndex(fs.index) - args = append(args, fs.name, fv.Interface()) + if fs.omitEmpty { + var empty = false + switch fv.Kind() { + case reflect.Array, reflect.Map, reflect.Slice, reflect.String: + empty = fv.Len() == 0 + case reflect.Bool: + empty = !fv.Bool() + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + empty = fv.Int() == 0 + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + empty = fv.Uint() == 0 + case reflect.Float32, reflect.Float64: + empty = fv.Float() == 0 + case reflect.Interface, reflect.Ptr: + empty = fv.IsNil() + } + if empty { + continue + } + } + if arg, ok := fv.Interface().(Argument); ok { + args = append(args, fs.name, arg.RedisArg()) + } else if fv.Kind() == reflect.Ptr { + if !fv.IsNil() { + args = append(args, fs.name, fv.Elem().Interface()) + } + } else { + args = append(args, fs.name, fv.Interface()) + } } return args } diff --git a/vendor/github.com/garyburd/redigo/redis/script.go b/vendor/github.com/gomodule/redigo/redis/script.go similarity index 97% rename from vendor/github.com/garyburd/redigo/redis/script.go rename to vendor/github.com/gomodule/redigo/redis/script.go index 78605a90a..0ef1c821f 100644 --- a/vendor/github.com/garyburd/redigo/redis/script.go +++ b/vendor/github.com/gomodule/redigo/redis/script.go @@ -55,6 +55,11 @@ func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} { return args } +// Hash returns the script hash. +func (s *Script) Hash() string { + return s.hash +} + // Do evaluates the script. Under the covers, Do optimistically evaluates the // script using the EVALSHA command. If the command fails because the script is // not loaded, then Do evaluates the script using the EVAL command (thus diff --git a/vendor/modules.txt b/vendor/modules.txt index 97821559b..7a4b9bbdd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -79,11 +79,10 @@ github.com/docker/go-events github.com/docker/go-metrics # github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 github.com/docker/libtrust -# github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 -github.com/garyburd/redigo/internal -github.com/garyburd/redigo/redis # github.com/golang/protobuf v1.3.2 github.com/golang/protobuf/proto +# github.com/gomodule/redigo v1.8.2 +github.com/gomodule/redigo/redis # github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33 github.com/gorilla/handlers # github.com/gorilla/mux v1.7.2