Merge pull request #999 from restic/backend-use-semaphore

backends: Use new semaphore
This commit is contained in:
Alexander Neumann 2017-06-07 19:48:32 +02:00
commit 550e1feaec
15 changed files with 249 additions and 164 deletions

View file

@ -53,6 +53,11 @@ func Open(cfg Config) (restic.Backend, error) {
return nil, errors.Wrap(err, "Bucket") return nil, errors.Wrap(err, "Bucket")
} }
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{ be := &b2Backend{
client: client, client: client,
bucket: bucket, bucket: bucket,
@ -61,7 +66,7 @@ func Open(cfg Config) (restic.Backend, error) {
Join: path.Join, Join: path.Join,
Path: cfg.Prefix, Path: cfg.Prefix,
}, },
sem: backend.NewSemaphore(cfg.Connections), sem: sem,
} }
return be, nil return be, nil
@ -88,6 +93,11 @@ func Create(cfg Config) (restic.Backend, error) {
return nil, errors.Wrap(err, "NewBucket") return nil, errors.Wrap(err, "NewBucket")
} }
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{ be := &b2Backend{
client: client, client: client,
bucket: bucket, bucket: bucket,
@ -96,7 +106,7 @@ func Create(cfg Config) (restic.Backend, error) {
Join: path.Join, Join: path.Join,
Path: cfg.Prefix, Path: cfg.Prefix,
}, },
sem: backend.NewSemaphore(cfg.Connections), sem: sem,
} }
present, err := be.Test(context.TODO(), restic.Handle{Type: restic.ConfigFile}) present, err := be.Test(context.TODO(), restic.Handle{Type: restic.ConfigFile})

View file

@ -17,7 +17,7 @@ type Config struct {
Bucket string Bucket string
Prefix string Prefix string
Connections int `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
} }
// NewConfig returns a new config with default options applied. // NewConfig returns a new config with default options applied.

View file

@ -120,9 +120,10 @@ var parseTests = []struct {
"s3://eu-central-1/bucketname", "s3://eu-central-1/bucketname",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}, },
}, },
}, },
@ -130,9 +131,10 @@ var parseTests = []struct {
"s3://hostname.foo/bucketname", "s3://hostname.foo/bucketname",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "hostname.foo", Endpoint: "hostname.foo",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}, },
}, },
}, },
@ -140,9 +142,10 @@ var parseTests = []struct {
"s3://hostname.foo/bucketname/prefix/directory", "s3://hostname.foo/bucketname/prefix/directory",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "hostname.foo", Endpoint: "hostname.foo",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}, },
}, },
}, },
@ -150,9 +153,10 @@ var parseTests = []struct {
"s3:eu-central-1/repo", "s3:eu-central-1/repo",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "repo", Bucket: "repo",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}, },
}, },
}, },
@ -160,9 +164,10 @@ var parseTests = []struct {
"s3:eu-central-1/repo/prefix/directory", "s3:eu-central-1/repo/prefix/directory",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "repo", Bucket: "repo",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}, },
}, },
}, },
@ -170,9 +175,10 @@ var parseTests = []struct {
"s3:https://hostname.foo/repo", "s3:https://hostname.foo/repo",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "hostname.foo", Endpoint: "hostname.foo",
Bucket: "repo", Bucket: "repo",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}, },
}, },
}, },
@ -180,9 +186,10 @@ var parseTests = []struct {
"s3:https://hostname.foo/repo/prefix/directory", "s3:https://hostname.foo/repo/prefix/directory",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "hostname.foo", Endpoint: "hostname.foo",
Bucket: "repo", Bucket: "repo",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}, },
}, },
}, },
@ -190,10 +197,11 @@ var parseTests = []struct {
"s3:http://hostname.foo/repo", "s3:http://hostname.foo/repo",
Location{Scheme: "s3", Location{Scheme: "s3",
Config: s3.Config{ Config: s3.Config{
Endpoint: "hostname.foo", Endpoint: "hostname.foo",
Bucket: "repo", Bucket: "repo",
Prefix: "restic", Prefix: "restic",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}, },
}, },
}, },
@ -201,8 +209,9 @@ var parseTests = []struct {
"swift:container17:/", "swift:container17:/",
Location{Scheme: "swift", Location{Scheme: "swift",
Config: swift.Config{ Config: swift.Config{
Container: "container17", Container: "container17",
Prefix: "", Prefix: "",
Connections: 20,
}, },
}, },
}, },
@ -210,8 +219,9 @@ var parseTests = []struct {
"swift:container17:/prefix97", "swift:container17:/prefix97",
Location{Scheme: "swift", Location{Scheme: "swift",
Config: swift.Config{ Config: swift.Config{
Container: "container17", Container: "container17",
Prefix: "prefix97", Prefix: "prefix97",
Connections: 20,
}, },
}, },
}, },
@ -219,7 +229,8 @@ var parseTests = []struct {
"rest:http://hostname.foo:1234/", "rest:http://hostname.foo:1234/",
Location{Scheme: "rest", Location{Scheme: "rest",
Config: rest.Config{ Config: rest.Config{
URL: parseURL("http://hostname.foo:1234/"), URL: parseURL("http://hostname.foo:1234/"),
Connections: 20,
}, },
}, },
}, },

View file

@ -5,11 +5,24 @@ import (
"strings" "strings"
"restic/errors" "restic/errors"
"restic/options"
) )
// Config contains all configuration necessary to connect to a REST server. // Config contains all configuration necessary to connect to a REST server.
type Config struct { type Config struct {
URL *url.URL URL *url.URL
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
}
func init() {
options.Register("rest", Config{})
}
// NewConfig returns a new Config with the default values filled in.
func NewConfig() Config {
return Config{
Connections: 20,
}
} }
// ParseConfig parses the string s and extracts the REST server URL. // ParseConfig parses the string s and extracts the REST server URL.
@ -25,6 +38,7 @@ func ParseConfig(s string) (interface{}, error) {
return nil, errors.Wrap(err, "url.Parse") return nil, errors.Wrap(err, "url.Parse")
} }
cfg := Config{URL: u} cfg := NewConfig()
cfg.URL = u
return cfg, nil return cfg, nil
} }

View file

@ -20,7 +20,8 @@ var configTests = []struct {
cfg Config cfg Config
}{ }{
{"rest:http://localhost:1234", Config{ {"rest:http://localhost:1234", Config{
URL: parseURL("http://localhost:1234"), URL: parseURL("http://localhost:1234"),
Connections: 20,
}}, }},
} }

View file

@ -26,21 +26,21 @@ const connLimit = 40
var _ restic.Backend = &restBackend{} var _ restic.Backend = &restBackend{}
type restBackend struct { type restBackend struct {
url *url.URL url *url.URL
connChan chan struct{} sem *backend.Semaphore
client *http.Client client *http.Client
backend.Layout backend.Layout
} }
// Open opens the REST backend with the given config. // Open opens the REST backend with the given config.
func Open(cfg Config) (restic.Backend, error) { func Open(cfg Config) (restic.Backend, error) {
connChan := make(chan struct{}, connLimit)
for i := 0; i < connLimit; i++ {
connChan <- struct{}{}
}
client := &http.Client{Transport: backend.Transport()} client := &http.Client{Transport: backend.Transport()}
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
// use url without trailing slash for layout // use url without trailing slash for layout
url := cfg.URL.String() url := cfg.URL.String()
if url[len(url)-1] == '/' { if url[len(url)-1] == '/' {
@ -48,10 +48,10 @@ func Open(cfg Config) (restic.Backend, error) {
} }
be := &restBackend{ be := &restBackend{
url: cfg.URL, url: cfg.URL,
connChan: connChan, client: client,
client: client, Layout: &backend.RESTLayout{URL: url, Join: path.Join},
Layout: &backend.RESTLayout{URL: url, Join: path.Join}, sem: sem,
} }
return be, nil return be, nil
@ -114,9 +114,9 @@ func (b *restBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (
// backend.Closer, which has a noop method. // backend.Closer, which has a noop method.
rd = backend.Closer{Reader: rd} rd = backend.Closer{Reader: rd}
<-b.connChan b.sem.GetToken()
resp, err := ctxhttp.Post(ctx, b.client, b.Filename(h), "binary/octet-stream", rd) resp, err := ctxhttp.Post(ctx, b.client, b.Filename(h), "binary/octet-stream", rd)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if resp != nil { if resp != nil {
defer func() { defer func() {
@ -169,9 +169,9 @@ func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, off
req.Header.Add("Range", byteRange) req.Header.Add("Range", byteRange)
debug.Log("Load(%v) send range %v", h, byteRange) debug.Log("Load(%v) send range %v", h, byteRange)
<-b.connChan b.sem.GetToken()
resp, err := ctxhttp.Do(ctx, b.client, req) resp, err := ctxhttp.Do(ctx, b.client, req)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
if resp != nil { if resp != nil {
@ -195,9 +195,9 @@ func (b *restBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInf
return restic.FileInfo{}, err return restic.FileInfo{}, err
} }
<-b.connChan b.sem.GetToken()
resp, err := ctxhttp.Head(ctx, b.client, b.Filename(h)) resp, err := ctxhttp.Head(ctx, b.client, b.Filename(h))
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "client.Head") return restic.FileInfo{}, errors.Wrap(err, "client.Head")
} }
@ -242,9 +242,9 @@ func (b *restBackend) Remove(ctx context.Context, h restic.Handle) error {
if err != nil { if err != nil {
return errors.Wrap(err, "http.NewRequest") return errors.Wrap(err, "http.NewRequest")
} }
<-b.connChan b.sem.GetToken()
resp, err := ctxhttp.Do(ctx, b.client, req) resp, err := ctxhttp.Do(ctx, b.client, req)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if err != nil { if err != nil {
return errors.Wrap(err, "client.Do") return errors.Wrap(err, "client.Do")
@ -273,9 +273,9 @@ func (b *restBackend) List(ctx context.Context, t restic.FileType) <-chan string
url += "/" url += "/"
} }
<-b.connChan b.sem.GetToken()
resp, err := ctxhttp.Get(ctx, b.client, url) resp, err := ctxhttp.Get(ctx, b.client, url)
b.connChan <- struct{}{} b.sem.ReleaseToken()
if resp != nil { if resp != nil {
defer func() { defer func() {

View file

@ -76,9 +76,8 @@ func newTestSuite(ctx context.Context, t testing.TB) *test.Suite {
t.Fatal(err) t.Fatal(err)
} }
cfg := rest.Config{ cfg := rest.NewConfig()
URL: url, cfg.URL = url
}
return cfg, nil return cfg, nil
}, },

View file

@ -18,6 +18,15 @@ type Config struct {
Bucket string Bucket string
Prefix string Prefix string
Layout string `option:"layout" help:"use this backend layout (default: auto-detect)"` Layout string `option:"layout" help:"use this backend layout (default: auto-detect)"`
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
}
// NewConfig returns a new Config with the default values filled in.
func NewConfig() Config {
return Config{
Connections: 20,
}
} }
func init() { func init() {
@ -70,10 +79,10 @@ func createConfig(endpoint string, p []string, useHTTP bool) (interface{}, error
default: default:
prefix = path.Clean(p[1]) prefix = path.Clean(p[1])
} }
return Config{ cfg := NewConfig()
Endpoint: endpoint, cfg.Endpoint = endpoint
UseHTTP: useHTTP, cfg.UseHTTP = useHTTP
Bucket: p[0], cfg.Bucket = p[0]
Prefix: prefix, cfg.Prefix = prefix
}, nil return cfg, nil
} }

View file

@ -7,78 +7,92 @@ var configTests = []struct {
cfg Config cfg Config
}{ }{
{"s3://eu-central-1/bucketname", Config{ {"s3://eu-central-1/bucketname", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/", Config{ {"s3://eu-central-1/bucketname/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/prefix/directory", Config{ {"s3://eu-central-1/bucketname/prefix/directory", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/prefix/directory/", Config{ {"s3://eu-central-1/bucketname/prefix/directory/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar", Config{ {"s3:eu-central-1/foobar", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/", Config{ {"s3:eu-central-1/foobar/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/prefix/directory", Config{ {"s3:eu-central-1/foobar/prefix/directory", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/prefix/directory/", Config{ {"s3:eu-central-1/foobar/prefix/directory/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:https://hostname:9999/foobar", Config{ {"s3:https://hostname:9999/foobar", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:https://hostname:9999/foobar/", Config{ {"s3:https://hostname:9999/foobar/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:http://hostname:9999/foobar", Config{ {"s3:http://hostname:9999/foobar", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/foobar/", Config{ {"s3:http://hostname:9999/foobar/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/bucket/prefix/directory", Config{ {"s3:http://hostname:9999/bucket/prefix/directory", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "bucket", Bucket: "bucket",
Prefix: "prefix/directory", Prefix: "prefix/directory",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/bucket/prefix/directory/", Config{ {"s3:http://hostname:9999/bucket/prefix/directory/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "bucket", Bucket: "bucket",
Prefix: "prefix/directory", Prefix: "prefix/directory",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
} }

View file

@ -24,7 +24,7 @@ const connLimit = 10
// s3 is a backend which stores the data on an S3 endpoint. // s3 is a backend which stores the data on an S3 endpoint.
type s3 struct { type s3 struct {
client *minio.Client client *minio.Client
connChan chan struct{} sem *backend.Semaphore
bucketname string bucketname string
prefix string prefix string
cacheMutex sync.RWMutex cacheMutex sync.RWMutex
@ -47,8 +47,14 @@ func Open(cfg Config) (restic.Backend, error) {
return nil, errors.Wrap(err, "minio.New") return nil, errors.Wrap(err, "minio.New")
} }
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &s3{ be := &s3{
client: client, client: client,
sem: sem,
bucketname: cfg.Bucket, bucketname: cfg.Bucket,
prefix: cfg.Prefix, prefix: cfg.Prefix,
cacheObjSize: make(map[string]int64), cacheObjSize: make(map[string]int64),
@ -63,8 +69,6 @@ func Open(cfg Config) (restic.Backend, error) {
be.Layout = l be.Layout = l
be.createConnections()
found, err := client.BucketExists(cfg.Bucket) found, err := client.BucketExists(cfg.Bucket)
if err != nil { if err != nil {
debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err) debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err)
@ -82,13 +86,6 @@ func Open(cfg Config) (restic.Backend, error) {
return be, nil return be, nil
} }
func (be *s3) createConnections() {
be.connChan = make(chan struct{}, connLimit)
for i := 0; i < connLimit; i++ {
be.connChan <- struct{}{}
}
}
// IsNotExist returns true if the error is caused by a not existing file. // IsNotExist returns true if the error is caused by a not existing file.
func (be *s3) IsNotExist(err error) bool { func (be *s3) IsNotExist(err error) bool {
debug.Log("IsNotExist(%T, %#v)", err, err) debug.Log("IsNotExist(%T, %#v)", err, err)
@ -226,7 +223,7 @@ func (be *s3) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err erro
return errors.New("key already exists") return errors.New("key already exists")
} }
<-be.connChan be.sem.GetToken()
// wrap the reader so that net/http client cannot close the reader, return // wrap the reader so that net/http client cannot close the reader, return
// the token instead. // the token instead.
@ -238,11 +235,10 @@ func (be *s3) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err erro
} }
debug.Log("PutObject(%v, %v)", be.bucketname, objName) debug.Log("PutObject(%v, %v)", be.bucketname, objName)
coreClient := minio.Core{be.client} coreClient := minio.Core{Client: be.client}
info, err := coreClient.PutObject(be.bucketname, objName, size, rd, nil, nil, nil) info, err := coreClient.PutObject(be.bucketname, objName, size, rd, nil, nil, nil)
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
debug.Log("%v -> %v bytes, err %#v", objName, info.Size, err) debug.Log("%v -> %v bytes, err %#v", objName, info.Size, err)
return errors.Wrap(err, "client.PutObject") return errors.Wrap(err, "client.PutObject")
@ -279,8 +275,7 @@ func (be *s3) Load(ctx context.Context, h restic.Handle, length int, offset int6
objName := be.Filename(h) objName := be.Filename(h)
// get token for connection be.sem.GetToken()
<-be.connChan
byteRange := fmt.Sprintf("bytes=%d-", offset) byteRange := fmt.Sprintf("bytes=%d-", offset)
if length > 0 { if length > 0 {
@ -290,11 +285,10 @@ func (be *s3) Load(ctx context.Context, h restic.Handle, length int, offset int6
headers.Add("Range", byteRange) headers.Add("Range", byteRange)
debug.Log("Load(%v) send range %v", h, byteRange) debug.Log("Load(%v) send range %v", h, byteRange)
coreClient := minio.Core{be.client} coreClient := minio.Core{Client: be.client}
rd, _, err := coreClient.GetObject(be.bucketname, objName, headers) rd, _, err := coreClient.GetObject(be.bucketname, objName, headers)
if err != nil { if err != nil {
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
return nil, err return nil, err
} }
@ -302,8 +296,7 @@ func (be *s3) Load(ctx context.Context, h restic.Handle, length int, offset int6
ReadCloser: rd, ReadCloser: rd,
f: func() { f: func() {
debug.Log("Close()") debug.Log("Close()")
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
}, },
} }

View file

@ -114,14 +114,13 @@ func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite {
key, secret := newRandomCredentials(t) key, secret := newRandomCredentials(t)
cfg.stopServer = runMinio(ctx, t, cfg.tempdir, key, secret) cfg.stopServer = runMinio(ctx, t, cfg.tempdir, key, secret)
cfg.Config = s3.Config{ cfg.Config = s3.NewConfig()
Endpoint: "localhost:9000", cfg.Config.Endpoint = "localhost:9000"
Bucket: "restictestbucket", cfg.Config.Bucket = "restictestbucket"
Prefix: fmt.Sprintf("test-%d", time.Now().UnixNano()), cfg.Config.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano())
UseHTTP: true, cfg.Config.UseHTTP = true
KeyID: key, cfg.Config.KeyID = key
Secret: secret, cfg.Config.Secret = secret
}
return cfg, nil return cfg, nil
}, },

View file

@ -1,15 +1,20 @@
package backend package backend
import "restic/errors"
// Semaphore limits access to a restricted resource. // Semaphore limits access to a restricted resource.
type Semaphore struct { type Semaphore struct {
ch chan struct{} ch chan struct{}
} }
// NewSemaphore returns a new semaphore with capacity n. // NewSemaphore returns a new semaphore with capacity n.
func NewSemaphore(n int) *Semaphore { func NewSemaphore(n uint) (*Semaphore, error) {
if n <= 0 {
return nil, errors.New("must be a positive number")
}
return &Semaphore{ return &Semaphore{
ch: make(chan struct{}, n), ch: make(chan struct{}, n),
} }, nil
} }
// GetToken blocks until a Token is available. // GetToken blocks until a Token is available.

View file

@ -3,6 +3,7 @@ package swift
import ( import (
"os" "os"
"restic/errors" "restic/errors"
"restic/options"
"strings" "strings"
) )
@ -24,6 +25,19 @@ type Config struct {
Container string Container string
Prefix string Prefix string
DefaultContainerPolicy string DefaultContainerPolicy string
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
}
func init() {
options.Register("swift", Config{})
}
// NewConfig returns a new config with the default values filled in.
func NewConfig() Config {
return Config{
Connections: 20,
}
} }
// ParseConfig parses the string s and extract swift's container name and prefix. // ParseConfig parses the string s and extract swift's container name and prefix.
@ -47,10 +61,9 @@ func ParseConfig(s string) (interface{}, error) {
} }
prefix = prefix[1:] prefix = prefix[1:]
cfg := Config{ cfg := NewConfig()
Container: container, cfg.Container = container
Prefix: prefix, cfg.Prefix = prefix
}
return cfg, nil return cfg, nil
} }

View file

@ -6,9 +6,28 @@ var configTests = []struct {
s string s string
cfg Config cfg Config
}{ }{
{"swift:cnt1:/", Config{Container: "cnt1", Prefix: ""}}, {
{"swift:cnt2:/prefix", Config{Container: "cnt2", Prefix: "prefix"}}, "swift:cnt1:/",
{"swift:cnt3:/prefix/longer", Config{Container: "cnt3", Prefix: "prefix/longer"}}, Config{
Container: "cnt1",
Prefix: "",
Connections: 20,
},
},
{
"swift:cnt2:/prefix",
Config{Container: "cnt2",
Prefix: "prefix",
Connections: 20,
},
},
{
"swift:cnt3:/prefix/longer",
Config{Container: "cnt3",
Prefix: "prefix/longer",
Connections: 20,
},
},
} }
func TestParseConfig(t *testing.T) { func TestParseConfig(t *testing.T) {

View file

@ -22,7 +22,7 @@ const connLimit = 10
// beSwift is a backend which stores the data on a swift endpoint. // beSwift is a backend which stores the data on a swift endpoint.
type beSwift struct { type beSwift struct {
conn *swift.Connection conn *swift.Connection
connChan chan struct{} sem *backend.Semaphore
container string // Container name container string // Container name
prefix string // Prefix of object names in the container prefix string // Prefix of object names in the container
backend.Layout backend.Layout
@ -36,6 +36,11 @@ var _ restic.Backend = &beSwift{}
func Open(cfg Config) (restic.Backend, error) { func Open(cfg Config) (restic.Backend, error) {
debug.Log("config %#v", cfg) debug.Log("config %#v", cfg)
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &beSwift{ be := &beSwift{
conn: &swift.Connection{ conn: &swift.Connection{
UserName: cfg.UserName, UserName: cfg.UserName,
@ -54,6 +59,7 @@ func Open(cfg Config) (restic.Backend, error) {
Transport: backend.Transport(), Transport: backend.Transport(),
}, },
sem: sem,
container: cfg.Container, container: cfg.Container,
prefix: cfg.Prefix, prefix: cfg.Prefix,
Layout: &backend.DefaultLayout{ Layout: &backend.DefaultLayout{
@ -61,7 +67,6 @@ func Open(cfg Config) (restic.Backend, error) {
Join: path.Join, Join: path.Join,
}, },
} }
be.createConnections()
// Authenticate if needed // Authenticate if needed
if !be.conn.Authenticated() { if !be.conn.Authenticated() {
@ -98,13 +103,6 @@ func Open(cfg Config) (restic.Backend, error) {
return be, nil return be, nil
} }
func (be *beSwift) createConnections() {
be.connChan = make(chan struct{}, connLimit)
for i := 0; i < connLimit; i++ {
be.connChan <- struct{}{}
}
}
func (be *beSwift) createContainer(policy string) error { func (be *beSwift) createContainer(policy string) error {
var h swift.Headers var h swift.Headers
if policy != "" { if policy != "" {
@ -140,9 +138,9 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset
objName := be.Filename(h) objName := be.Filename(h)
<-be.connChan be.sem.GetToken()
defer func() { defer func() {
be.connChan <- struct{}{} be.sem.ReleaseToken()
}() }()
headers := swift.Headers{} headers := swift.Headers{}
@ -190,9 +188,9 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
return errors.Wrap(err, "conn.Object") return errors.Wrap(err, "conn.Object")
} }
<-be.connChan be.sem.GetToken()
defer func() { defer func() {
be.connChan <- struct{}{} be.sem.ReleaseToken()
}() }()
encoding := "binary/octet-stream" encoding := "binary/octet-stream"