Merge pull request #792 from restic/fix-791

s3: Increase MaxIdleConnsPerHost
This commit is contained in:
Alexander Neumann 2017-02-11 11:20:20 +01:00
commit 83538c745a
4 changed files with 49 additions and 8 deletions

View file

@ -14,11 +14,11 @@
# docker run --rm -v $PWD:/home/travis/restic restic/test gb test -v ./backend # docker run --rm -v $PWD:/home/travis/restic restic/test gb test -v ./backend
# #
# build the image for an older version of Go: # build the image for an older version of Go:
# docker build --build-arg GOVERSION=1.3.3 -t restic/test:go1.3.3 . # docker build --build-arg GOVERSION=1.6.4 -t restic/test:go1.6.4 .
FROM ubuntu:14.04 FROM ubuntu:14.04
ARG GOVERSION=1.7 ARG GOVERSION=1.7.5
ARG GOARCH=amd64 ARG GOARCH=amd64
# install dependencies # install dependencies

View file

@ -17,7 +17,7 @@ import (
"restic/backend" "restic/backend"
) )
const connLimit = 10 const connLimit = 40
// make sure the rest backend implements restic.Backend // make sure the rest backend implements restic.Backend
var _ restic.Backend = &restBackend{} var _ restic.Backend = &restBackend{}

View file

@ -3,6 +3,7 @@ package s3
import ( import (
"bytes" "bytes"
"io" "io"
"net/http"
"path" "path"
"restic" "restic"
"strings" "strings"
@ -15,7 +16,7 @@ import (
"restic/debug" "restic/debug"
) )
const connLimit = 10 const connLimit = 40
// s3 is a backend which stores the data on an S3 endpoint. // s3 is a backend which stores the data on an S3 endpoint.
type s3 struct { type s3 struct {
@ -36,6 +37,10 @@ func Open(cfg Config) (restic.Backend, error) {
} }
be := &s3{client: client, bucketname: cfg.Bucket, prefix: cfg.Prefix} be := &s3{client: client, bucketname: cfg.Bucket, prefix: cfg.Prefix}
tr := &http.Transport{MaxIdleConnsPerHost: connLimit}
client.SetCustomTransport(tr)
be.createConnections() be.createConnections()
found, err := client.BucketExists(cfg.Bucket) found, err := client.BucketExists(cfg.Bucket)
@ -104,6 +109,18 @@ func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) {
return errors.Wrap(err, "client.PutObject") return errors.Wrap(err, "client.PutObject")
} }
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
f func()
}
func (wr wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
// Load returns a reader that yields the contents of the file at h at the // Load returns a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset. If length is nonzero, only a portion of the file is
// returned. rd must be closed after use. // returned. rd must be closed after use.
@ -125,28 +142,48 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er
objName := be.s3path(h) objName := be.s3path(h)
// get token for connection
<-be.connChan <-be.connChan
defer func() {
be.connChan <- struct{}{}
}()
obj, err := be.client.GetObject(be.bucketname, objName) obj, err := be.client.GetObject(be.bucketname, objName)
if err != nil { if err != nil {
debug.Log(" err %v", err) debug.Log(" err %v", err)
// return token
be.connChan <- struct{}{}
return nil, errors.Wrap(err, "client.GetObject") return nil, errors.Wrap(err, "client.GetObject")
} }
// if we're going to read the whole object, just pass it on. // if we're going to read the whole object, just pass it on.
if length == 0 { if length == 0 {
debug.Log("Load %v: pass on object", h) debug.Log("Load %v: pass on object", h)
_, err = obj.Seek(offset, 0) _, err = obj.Seek(offset, 0)
if err != nil { if err != nil {
_ = obj.Close() _ = obj.Close()
// return token
be.connChan <- struct{}{}
return nil, errors.Wrap(err, "obj.Seek") return nil, errors.Wrap(err, "obj.Seek")
} }
return obj, nil rd := wrapReader{
ReadCloser: obj,
f: func() {
debug.Log("Close()")
// return token
be.connChan <- struct{}{}
},
} }
return rd, nil
}
defer func() {
// return token
be.connChan <- struct{}{}
}()
// otherwise use a buffer with ReadAt // otherwise use a buffer with ReadAt
info, err := obj.Stat() info, err := obj.Stat()

View file

@ -245,21 +245,25 @@ func TestLoad(t testing.TB) {
buf, err := ioutil.ReadAll(rd) buf, err := ioutil.ReadAll(rd)
if err != nil { if err != nil {
t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %v", l, o, err) t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %v", l, o, err)
rd.Close()
continue continue
} }
if l <= len(d) && len(buf) != l { if l <= len(d) && len(buf) != l {
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf)) t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf))
rd.Close()
continue continue
} }
if l > len(d) && len(buf) != len(d) { if l > len(d) && len(buf) != len(d) {
t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf)) t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf))
rd.Close()
continue continue
} }
if !bytes.Equal(buf, d) { if !bytes.Equal(buf, d) {
t.Errorf("Load(%d, %d) returned wrong bytes", l, o) t.Errorf("Load(%d, %d) returned wrong bytes", l, o)
rd.Close()
continue continue
} }