forked from TrueCloudLab/restic
swift: Use semaphore
This commit is contained in:
parent
46049b4236
commit
aa5bc39311
3 changed files with 50 additions and 20 deletions
|
@ -3,6 +3,7 @@ package swift
|
|||
import (
|
||||
"os"
|
||||
"restic/errors"
|
||||
"restic/options"
|
||||
"strings"
|
||||
)
|
||||
|
||||
|
@ -24,6 +25,19 @@ type Config struct {
|
|||
Container string
|
||||
Prefix string
|
||||
DefaultContainerPolicy string
|
||||
|
||||
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
options.Register("swift", Config{})
|
||||
}
|
||||
|
||||
// NewConfig returns a new config with the default values filled in.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
Connections: 20,
|
||||
}
|
||||
}
|
||||
|
||||
// ParseConfig parses the string s and extract swift's container name and prefix.
|
||||
|
@ -47,10 +61,9 @@ func ParseConfig(s string) (interface{}, error) {
|
|||
}
|
||||
prefix = prefix[1:]
|
||||
|
||||
cfg := Config{
|
||||
Container: container,
|
||||
Prefix: prefix,
|
||||
}
|
||||
cfg := NewConfig()
|
||||
cfg.Container = container
|
||||
cfg.Prefix = prefix
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
|
@ -6,9 +6,28 @@ var configTests = []struct {
|
|||
s string
|
||||
cfg Config
|
||||
}{
|
||||
{"swift:cnt1:/", Config{Container: "cnt1", Prefix: ""}},
|
||||
{"swift:cnt2:/prefix", Config{Container: "cnt2", Prefix: "prefix"}},
|
||||
{"swift:cnt3:/prefix/longer", Config{Container: "cnt3", Prefix: "prefix/longer"}},
|
||||
{
|
||||
"swift:cnt1:/",
|
||||
Config{
|
||||
Container: "cnt1",
|
||||
Prefix: "",
|
||||
Connections: 20,
|
||||
},
|
||||
},
|
||||
{
|
||||
"swift:cnt2:/prefix",
|
||||
Config{Container: "cnt2",
|
||||
Prefix: "prefix",
|
||||
Connections: 20,
|
||||
},
|
||||
},
|
||||
{
|
||||
"swift:cnt3:/prefix/longer",
|
||||
Config{Container: "cnt3",
|
||||
Prefix: "prefix/longer",
|
||||
Connections: 20,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
|
|
|
@ -21,7 +21,7 @@ const connLimit = 10
|
|||
// beSwift is a backend which stores the data on a swift endpoint.
|
||||
type beSwift struct {
|
||||
conn *swift.Connection
|
||||
connChan chan struct{}
|
||||
sem *backend.Semaphore
|
||||
container string // Container name
|
||||
prefix string // Prefix of object names in the container
|
||||
backend.Layout
|
||||
|
@ -32,6 +32,11 @@ type beSwift struct {
|
|||
func Open(cfg Config) (restic.Backend, error) {
|
||||
debug.Log("config %#v", cfg)
|
||||
|
||||
sem, err := backend.NewSemaphore(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &beSwift{
|
||||
conn: &swift.Connection{
|
||||
UserName: cfg.UserName,
|
||||
|
@ -50,6 +55,7 @@ func Open(cfg Config) (restic.Backend, error) {
|
|||
|
||||
Transport: backend.Transport(),
|
||||
},
|
||||
sem: sem,
|
||||
container: cfg.Container,
|
||||
prefix: cfg.Prefix,
|
||||
Layout: &backend.DefaultLayout{
|
||||
|
@ -57,7 +63,6 @@ func Open(cfg Config) (restic.Backend, error) {
|
|||
Join: path.Join,
|
||||
},
|
||||
}
|
||||
be.createConnections()
|
||||
|
||||
// Authenticate if needed
|
||||
if !be.conn.Authenticated() {
|
||||
|
@ -94,13 +99,6 @@ func Open(cfg Config) (restic.Backend, error) {
|
|||
return be, nil
|
||||
}
|
||||
|
||||
func (be *beSwift) createConnections() {
|
||||
be.connChan = make(chan struct{}, connLimit)
|
||||
for i := 0; i < connLimit; i++ {
|
||||
be.connChan <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (be *beSwift) createContainer(policy string) error {
|
||||
var h swift.Headers
|
||||
if policy != "" {
|
||||
|
@ -136,9 +134,9 @@ func (be *beSwift) Load(h restic.Handle, length int, offset int64) (io.ReadClose
|
|||
|
||||
objName := be.Filename(h)
|
||||
|
||||
<-be.connChan
|
||||
be.sem.GetToken()
|
||||
defer func() {
|
||||
be.connChan <- struct{}{}
|
||||
be.sem.ReleaseToken()
|
||||
}()
|
||||
|
||||
headers := swift.Headers{}
|
||||
|
@ -186,9 +184,9 @@ func (be *beSwift) Save(h restic.Handle, rd io.Reader) (err error) {
|
|||
return errors.Wrap(err, "conn.Object")
|
||||
}
|
||||
|
||||
<-be.connChan
|
||||
be.sem.GetToken()
|
||||
defer func() {
|
||||
be.connChan <- struct{}{}
|
||||
be.sem.ReleaseToken()
|
||||
}()
|
||||
|
||||
encoding := "binary/octet-stream"
|
||||
|
|
Loading…
Reference in a new issue