Add rclone backend
This commit is contained in:
parent
e377759c81
commit
fe99340e40
10 changed files with 512 additions and 1 deletions
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/gs"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/location"
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/backend/sftp"
|
||||
|
@ -509,6 +510,14 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
|
|||
return nil, err
|
||||
}
|
||||
|
||||
debug.Log("opening rest repository at %#v", cfg)
|
||||
return cfg, nil
|
||||
case "rclone":
|
||||
cfg := loc.Config.(rclone.Config)
|
||||
if err := opts.Apply(loc.Scheme, &cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
debug.Log("opening rest repository at %#v", cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
@ -564,6 +573,8 @@ func open(s string, gopts GlobalOptions, opts options.Options) (restic.Backend,
|
|||
be, err = b2.Open(globalOptions.ctx, cfg.(b2.Config), rt)
|
||||
case "rest":
|
||||
be, err = rest.Open(cfg.(rest.Config), rt)
|
||||
case "rclone":
|
||||
be, err = rclone.Open(cfg.(rclone.Config))
|
||||
|
||||
default:
|
||||
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
|
||||
|
@ -625,6 +636,8 @@ func create(s string, opts options.Options) (restic.Backend, error) {
|
|||
return b2.Create(globalOptions.ctx, cfg.(b2.Config), rt)
|
||||
case "rest":
|
||||
return rest.Create(cfg.(rest.Config), rt)
|
||||
case "rclone":
|
||||
return rclone.Open(cfg.(rclone.Config))
|
||||
}
|
||||
|
||||
debug.Log("invalid repository scheme: %v", s)
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
// StartForeground runs cmd in the foreground, by temporarily switching to the
|
||||
// new process group created for cmd. The returned function `bg` switches back
|
||||
// to the previous process group.
|
||||
func startForeground(cmd *exec.Cmd) (bg func() error, err error) {
|
||||
func StartForeground(cmd *exec.Cmd) (bg func() error, err error) {
|
||||
// just start the process and hope for the best
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/b2"
|
||||
"github.com/restic/restic/internal/backend/gs"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/backend/sftp"
|
||||
|
@ -38,6 +39,7 @@ var parsers = []parser{
|
|||
{"azure", azure.ParseConfig},
|
||||
{"swift", swift.ParseConfig},
|
||||
{"rest", rest.ParseConfig},
|
||||
{"rclone", rclone.ParseConfig},
|
||||
}
|
||||
|
||||
func isPath(s string) bool {
|
||||
|
|
225
internal/backend/rclone/backend.go
Normal file
225
internal/backend/rclone/backend.go
Normal file
|
@ -0,0 +1,225 @@
|
|||
package rclone
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
// Backend is used to access data stored somewhere via rclone.
|
||||
type Backend struct {
|
||||
*rest.Backend
|
||||
tr *http2.Transport
|
||||
cmd *exec.Cmd
|
||||
waitCh <-chan struct{}
|
||||
waitResult error
|
||||
}
|
||||
|
||||
// run starts command with args and initializes the StdioConn.
|
||||
func run(command string, args ...string) (*StdioConn, *exec.Cmd, func() error, error) {
|
||||
cmd := exec.Command(command, args...)
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
r, stdin, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
stdout, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
cmd.Stdin = r
|
||||
cmd.Stdout = w
|
||||
|
||||
bg, err := backend.StartForeground(cmd)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
c := &StdioConn{
|
||||
stdin: stdout,
|
||||
stdout: stdin,
|
||||
cmd: cmd,
|
||||
}
|
||||
|
||||
return c, cmd, bg, nil
|
||||
}
|
||||
|
||||
// New initializes a Backend and starts the process.
|
||||
func New(cfg Config) (*Backend, error) {
|
||||
var (
|
||||
args []string
|
||||
err error
|
||||
)
|
||||
|
||||
// build program args, start with the program
|
||||
if cfg.Program != "" {
|
||||
a, err := backend.SplitShellStrings(cfg.Program)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args = append(args, a...)
|
||||
} else {
|
||||
args = append(args, "rclone")
|
||||
}
|
||||
|
||||
// then add the arguments
|
||||
if cfg.Args != "" {
|
||||
a, err := backend.SplitShellStrings(cfg.Args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
args = append(args, a...)
|
||||
} else {
|
||||
args = append(args, "serve", "restic", "--stdio")
|
||||
}
|
||||
|
||||
// finally, add the remote
|
||||
args = append(args, cfg.Remote)
|
||||
arg0, args := args[0], args[1:]
|
||||
|
||||
debug.Log("running command: %v %v", arg0, args)
|
||||
conn, cmd, bg, err := run(arg0, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tr := &http2.Transport{
|
||||
AllowHTTP: true, // this is not really HTTP, just stdin/stdout
|
||||
DialTLS: func(network, address string, cfg *tls.Config) (net.Conn, error) {
|
||||
debug.Log("new connection requested, %v %v", network, address)
|
||||
return conn, nil
|
||||
},
|
||||
}
|
||||
|
||||
waitCh := make(chan struct{})
|
||||
be := &Backend{
|
||||
tr: tr,
|
||||
cmd: cmd,
|
||||
waitCh: waitCh,
|
||||
}
|
||||
|
||||
go func() {
|
||||
debug.Log("waiting for error result")
|
||||
err := cmd.Wait()
|
||||
debug.Log("Wait returned %v", err)
|
||||
be.waitResult = err
|
||||
close(waitCh)
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
debug.Log("monitoring command to cancel first HTTP request context")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
debug.Log("context has been cancelled, returning")
|
||||
case <-be.waitCh:
|
||||
debug.Log("command has exited, cancelling context")
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
// send an HTTP request to the base URL, see if the server is there
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "http://localhost/", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Accept", rest.ContentTypeV2)
|
||||
|
||||
res, err := ctxhttp.Do(ctx, client, req)
|
||||
if err != nil {
|
||||
bg()
|
||||
_ = cmd.Process.Kill()
|
||||
return nil, errors.Errorf("error talking HTTP to rclone: %v", err)
|
||||
}
|
||||
|
||||
debug.Log("HTTP status %q returned, moving instance to background", res.Status)
|
||||
bg()
|
||||
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// Open starts an rclone process with the given config.
|
||||
func Open(cfg Config) (*Backend, error) {
|
||||
be, err := New(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
url, err := url.Parse("http://localhost/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
restConfig := rest.Config{
|
||||
Connections: 20,
|
||||
URL: url,
|
||||
}
|
||||
|
||||
restBackend, err := rest.Open(restConfig, be.tr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be.Backend = restBackend
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// Create initializes a new restic repo with clone.
|
||||
func Create(cfg Config) (*Backend, error) {
|
||||
be, err := New(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
debug.Log("new backend created")
|
||||
|
||||
url, err := url.Parse("http://localhost/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
restConfig := rest.Config{
|
||||
Connections: 20,
|
||||
URL: url,
|
||||
}
|
||||
|
||||
restBackend, err := rest.Create(restConfig, be.tr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be.Backend = restBackend
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// Close terminates the backend.
|
||||
func (be *Backend) Close() error {
|
||||
debug.Log("exting rclone")
|
||||
be.tr.CloseIdleConnections()
|
||||
<-be.waitCh
|
||||
debug.Log("wait for rclone returned: %v", be.waitResult)
|
||||
return be.waitResult
|
||||
}
|
83
internal/backend/rclone/backend_test.go
Normal file
83
internal/backend/rclone/backend_test.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package rclone_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/test"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
const rcloneConfig = `
|
||||
[local]
|
||||
type = local
|
||||
`
|
||||
|
||||
func newTestSuite(t testing.TB) *test.Suite {
|
||||
dir, cleanup := rtest.TempDir(t)
|
||||
|
||||
return &test.Suite{
|
||||
// NewConfig returns a config for a new temporary backend that will be used in tests.
|
||||
NewConfig: func() (interface{}, error) {
|
||||
cfgfile := filepath.Join(dir, "rclone.conf")
|
||||
t.Logf("write rclone config to %v", cfgfile)
|
||||
err := ioutil.WriteFile(cfgfile, []byte(rcloneConfig), 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.Logf("use backend at %v", dir)
|
||||
|
||||
repodir := filepath.Join(dir, "repo")
|
||||
err = os.Mkdir(repodir, 0755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := rclone.NewConfig()
|
||||
cfg.Program = fmt.Sprintf("rclone --config %q", cfgfile)
|
||||
cfg.Remote = "local:" + repodir
|
||||
return cfg, nil
|
||||
},
|
||||
|
||||
// CreateFn is a function that creates a temporary repository for the tests.
|
||||
Create: func(config interface{}) (restic.Backend, error) {
|
||||
t.Logf("Create()")
|
||||
cfg := config.(rclone.Config)
|
||||
return rclone.Create(cfg)
|
||||
},
|
||||
|
||||
// OpenFn is a function that opens a previously created temporary repository.
|
||||
Open: func(config interface{}) (restic.Backend, error) {
|
||||
t.Logf("Open()")
|
||||
cfg := config.(rclone.Config)
|
||||
return rclone.Open(cfg)
|
||||
},
|
||||
|
||||
// CleanupFn removes data created during the tests.
|
||||
Cleanup: func(config interface{}) error {
|
||||
t.Logf("cleanup dir %v", dir)
|
||||
cleanup()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendRclone(t *testing.T) {
|
||||
defer func() {
|
||||
if t.Skipped() {
|
||||
rtest.SkipDisallowed(t, "restic/backend/rclone.TestBackendRclone")
|
||||
}
|
||||
}()
|
||||
|
||||
newTestSuite(t).RunTests(t)
|
||||
}
|
||||
|
||||
func BenchmarkBackendREST(t *testing.B) {
|
||||
newTestSuite(t).RunBenchmarks(t)
|
||||
}
|
36
internal/backend/rclone/config.go
Normal file
36
internal/backend/rclone/config.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package rclone
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/options"
|
||||
)
|
||||
|
||||
// Config contains all configuration necessary to start rclone.
|
||||
type Config struct {
|
||||
Program string `option:"program" help:"path to rclone (default: rclone)"`
|
||||
Args string `option:"args" help:"arguments for running rclone (default: restic serve --stdio)"`
|
||||
Remote string
|
||||
}
|
||||
|
||||
func init() {
|
||||
options.Register("rclone", Config{})
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with the default values filled in.
|
||||
func NewConfig() Config {
|
||||
return Config{}
|
||||
}
|
||||
|
||||
// ParseConfig parses the string s and extracts the remote server URL.
|
||||
func ParseConfig(s string) (interface{}, error) {
|
||||
if !strings.HasPrefix(s, "rclone:") {
|
||||
return nil, errors.New("invalid rclone backend specification")
|
||||
}
|
||||
|
||||
s = s[7:]
|
||||
cfg := NewConfig()
|
||||
cfg.Remote = s
|
||||
return cfg, nil
|
||||
}
|
33
internal/backend/rclone/config_test.go
Normal file
33
internal/backend/rclone/config_test.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package rclone
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
var tests = []struct {
|
||||
s string
|
||||
cfg Config
|
||||
}{
|
||||
{
|
||||
"rclone:local:foo:/bar",
|
||||
Config{
|
||||
Remote: "local:foo:/bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
cfg, err := ParseConfig(test.s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(cfg, test.cfg) {
|
||||
t.Fatalf("wrong config, want:\n %v\ngot:\n %v", test.cfg, cfg)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
72
internal/backend/rclone/stdio_conn.go
Normal file
72
internal/backend/rclone/stdio_conn.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package rclone
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
)
|
||||
|
||||
// StdioConn implements a net.Conn via stdin/stdout.
|
||||
type StdioConn struct {
|
||||
stdin *os.File
|
||||
stdout *os.File
|
||||
bytesWritten, bytesRead int
|
||||
cmd *exec.Cmd
|
||||
}
|
||||
|
||||
func (s *StdioConn) Read(p []byte) (int, error) {
|
||||
n, err := s.stdin.Read(p)
|
||||
s.bytesRead += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (s *StdioConn) Write(p []byte) (int, error) {
|
||||
n, err := s.stdout.Write(p)
|
||||
s.bytesWritten += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close closes both streams.
|
||||
func (s *StdioConn) Close() error {
|
||||
debug.Log("close server instance")
|
||||
var errs []error
|
||||
|
||||
for _, f := range []func() error{s.stdin.Close, s.stdout.Close} {
|
||||
err := f()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalAddr returns nil.
|
||||
func (s *StdioConn) LocalAddr() net.Addr {
|
||||
return Addr{}
|
||||
}
|
||||
|
||||
// RemoteAddr returns nil.
|
||||
func (s *StdioConn) RemoteAddr() net.Addr {
|
||||
return Addr{}
|
||||
}
|
||||
|
||||
// make sure StdioConn implements net.Conn
|
||||
var _ net.Conn = &StdioConn{}
|
||||
|
||||
// Addr implements net.Addr for stdin/stdout.
|
||||
type Addr struct{}
|
||||
|
||||
// Network returns the network type as a string.
|
||||
func (a Addr) Network() string {
|
||||
return "stdio"
|
||||
}
|
||||
|
||||
func (a Addr) String() string {
|
||||
return "stdio"
|
||||
}
|
25
internal/backend/rclone/stdio_conn_go110.go
Normal file
25
internal/backend/rclone/stdio_conn_go110.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
// +build go1.10
|
||||
|
||||
package rclone
|
||||
|
||||
import "time"
|
||||
|
||||
// SetDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetDeadline(t time.Time) error {
|
||||
err1 := s.stdin.SetReadDeadline(t)
|
||||
err2 := s.stdout.SetWriteDeadline(t)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return err2
|
||||
}
|
||||
|
||||
// SetReadDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetReadDeadline(t time.Time) error {
|
||||
return s.stdin.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetWriteDeadline(t time.Time) error {
|
||||
return s.stdout.SetWriteDeadline(t)
|
||||
}
|
22
internal/backend/rclone/stdio_conn_other.go
Normal file
22
internal/backend/rclone/stdio_conn_other.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
// +build !go1.10
|
||||
|
||||
package rclone
|
||||
|
||||
import "time"
|
||||
|
||||
// On Go < 1.10, it's not possible to set read/write deadlines on files, so we just ignore that.
|
||||
|
||||
// SetDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetReadDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the read/write deadline.
|
||||
func (s *StdioConn) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue