package rclone import ( "bufio" "context" "crypto/tls" "fmt" "io" "math/rand" "net" "net/http" "net/url" "os" "os/exec" "sync" "syscall" "time" "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/limiter" "github.com/restic/restic/internal/backend/location" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/util" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "golang.org/x/net/http2" ) // Backend is used to access data stored somewhere via rclone. type Backend struct { *rest.Backend tr *http2.Transport cmd *exec.Cmd waitCh <-chan struct{} waitResult error wg *sync.WaitGroup conn *StdioConn } func NewFactory() location.Factory { return location.NewLimitedBackendFactory("rclone", ParseConfig, location.NoPassword, Create, Open) } // run starts command with args and initializes the StdioConn. func run(command string, args ...string) (*StdioConn, *sync.WaitGroup, chan struct{}, func() error, error) { cmd := exec.Command(command, args...) p, err := cmd.StderrPipe() if err != nil { return nil, nil, nil, nil, err } var wg sync.WaitGroup waitCh := make(chan struct{}) // start goroutine to add a prefix to all messages printed by to stderr by rclone wg.Add(1) go func() { defer wg.Done() defer close(waitCh) sc := bufio.NewScanner(p) for sc.Scan() { fmt.Fprintf(os.Stderr, "rclone: %v\n", sc.Text()) } debug.Log("command has exited, closing waitCh") }() r, stdin, err := os.Pipe() if err != nil { return nil, nil, nil, nil, err } stdout, w, err := os.Pipe() if err != nil { // close first pipe and ignore subsequent errors _ = r.Close() _ = stdin.Close() return nil, nil, nil, nil, err } cmd.Stdin = r cmd.Stdout = w bg, err := util.StartForeground(cmd) // close rclone side of pipes errR := r.Close() errW := w.Close() // return first error if err == nil { err = errR } if err == nil { err = errW } if err != nil { if errors.Is(err, exec.ErrDot) { return nil, nil, nil, nil, errors.Errorf("cannot implicitly run relative executable %v found in current directory, use -o rclone.program=./<program> to override", cmd.Path) } return nil, nil, nil, nil, err } c := &StdioConn{ receive: stdout, send: stdin, cmd: cmd, } return c, &wg, waitCh, bg, nil } // wrappedConn adds bandwidth limiting capabilities to the StdioConn by // wrapping the Read/Write methods. type wrappedConn struct { *StdioConn io.Reader io.Writer } func (c *wrappedConn) Read(p []byte) (int, error) { return c.Reader.Read(p) } func (c *wrappedConn) Write(p []byte) (int, error) { return c.Writer.Write(p) } func wrapConn(c *StdioConn, lim limiter.Limiter) *wrappedConn { wc := &wrappedConn{ StdioConn: c, Reader: c, Writer: c, } if lim != nil { wc.Reader = lim.Downstream(c) wc.Writer = lim.UpstreamWriter(c) } return wc } // New initializes a Backend and starts the process. func newBackend(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) { var ( args []string err error ) // build program args, start with the program if cfg.Program != "" { a, err := backend.SplitShellStrings(cfg.Program) if err != nil { return nil, err } args = append(args, a...) } // then add the arguments if cfg.Args != "" { a, err := backend.SplitShellStrings(cfg.Args) if err != nil { return nil, err } args = append(args, a...) } // finally, add the remote args = append(args, cfg.Remote) arg0, args := args[0], args[1:] debug.Log("running command: %v %v", arg0, args) stdioConn, wg, waitCh, bg, err := run(arg0, args...) if err != nil { return nil, err } var conn net.Conn = stdioConn if lim != nil { conn = wrapConn(stdioConn, lim) } dialCount := 0 tr := &http2.Transport{ AllowHTTP: true, // this is not really HTTP, just stdin/stdout DialTLS: func(network, address string, _ *tls.Config) (net.Conn, error) { debug.Log("new connection requested, %v %v", network, address) if dialCount > 0 { // the connection to the child process is already closed return nil, backoff.Permanent(errors.New("rclone stdio connection already closed")) } dialCount++ return conn, nil }, } cmd := stdioConn.cmd be := &Backend{ tr: tr, cmd: cmd, waitCh: waitCh, conn: stdioConn, wg: wg, } ctx, cancel := context.WithCancel(ctx) defer cancel() wg.Add(1) go func() { defer wg.Done() <-waitCh cancel() // according to the documentation of StdErrPipe, Wait() must only be called after the former has completed err := cmd.Wait() debug.Log("Wait returned %v", err) be.waitResult = err // close our side of the pipes to rclone, ignore errors _ = stdioConn.CloseAll() }() // send an HTTP request to the base URL, see if the server is there client := http.Client{ Transport: debug.RoundTripper(tr), Timeout: cfg.Timeout, } // request a random file which does not exist. we just want to test when // rclone is able to accept HTTP requests. url := fmt.Sprintf("http://localhost/file-%d", rand.Uint64()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } req.Header.Set("Accept", rest.ContentTypeV2) res, err := client.Do(req) if err != nil { // ignore subsequent errors _ = bg() _ = cmd.Process.Kill() // wait for rclone to exit wg.Wait() // try to return the program exit code if communication with rclone has failed if be.waitResult != nil && (errors.Is(err, context.Canceled) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrClosed)) { err = be.waitResult } return nil, fmt.Errorf("error talking HTTP to rclone: %w", err) } _ = res.Body.Close() debug.Log("HTTP status %q returned, moving instance to background", res.Status) err = bg() if err != nil { return nil, fmt.Errorf("error moving process to background: %w", err) } return be, nil } // Open starts an rclone process with the given config. func Open(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) { be, err := newBackend(ctx, cfg, lim) if err != nil { return nil, err } url, err := url.Parse("http://localhost/") if err != nil { return nil, err } restConfig := rest.Config{ Connections: cfg.Connections, URL: url, } restBackend, err := rest.Open(ctx, restConfig, debug.RoundTripper(be.tr)) if err != nil { _ = be.Close() return nil, err } be.Backend = restBackend return be, nil } // Create initializes a new restic repo with rclone. func Create(ctx context.Context, cfg Config, lim limiter.Limiter) (*Backend, error) { be, err := newBackend(ctx, cfg, lim) if err != nil { return nil, err } debug.Log("new backend created") url, err := url.Parse("http://localhost/") if err != nil { return nil, err } restConfig := rest.Config{ Connections: cfg.Connections, URL: url, } restBackend, err := rest.Create(ctx, restConfig, debug.RoundTripper(be.tr)) if err != nil { _ = be.Close() return nil, err } be.Backend = restBackend return be, nil } const waitForExit = 5 * time.Second // Close terminates the backend. func (be *Backend) Close() error { debug.Log("exiting rclone") be.tr.CloseIdleConnections() select { case <-be.waitCh: debug.Log("rclone exited") case <-time.After(waitForExit): debug.Log("timeout, closing file descriptors") err := be.conn.CloseAll() if err != nil { return err } } be.wg.Wait() debug.Log("wait for rclone returned: %v", be.waitResult) return be.waitResult }