cmd/serve: add serve docker command (#5415)

Fixes #4750

Co-authored-by: Ivan Andreev <ivandeex@gmail.com>
This commit is contained in:
Antoine GIRARD 2021-01-03 01:05:52 +01:00 committed by Ivan Andreev
parent 221dfc3882
commit daf449b5f2
15 changed files with 1864 additions and 0 deletions

175
cmd/serve/docker/api.go Normal file
View file

@ -0,0 +1,175 @@
package docker
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/rclone/rclone/fs"
)
const (
contentType = "application/vnd.docker.plugins.v1.1+json"
activatePath = "/Plugin.Activate"
createPath = "/VolumeDriver.Create"
getPath = "/VolumeDriver.Get"
listPath = "/VolumeDriver.List"
removePath = "/VolumeDriver.Remove"
pathPath = "/VolumeDriver.Path"
mountPath = "/VolumeDriver.Mount"
unmountPath = "/VolumeDriver.Unmount"
capsPath = "/VolumeDriver.Capabilities"
)
// CreateRequest is the structure that docker's requests are deserialized to.
type CreateRequest struct {
Name string
Options map[string]string `json:"Opts,omitempty"`
}
// RemoveRequest structure for a volume remove request
type RemoveRequest struct {
Name string
}
// MountRequest structure for a volume mount request
type MountRequest struct {
Name string
ID string
}
// MountResponse structure for a volume mount response
type MountResponse struct {
Mountpoint string
}
// UnmountRequest structure for a volume unmount request
type UnmountRequest struct {
Name string
ID string
}
// PathRequest structure for a volume path request
type PathRequest struct {
Name string
}
// PathResponse structure for a volume path response
type PathResponse struct {
Mountpoint string
}
// GetRequest structure for a volume get request
type GetRequest struct {
Name string
}
// GetResponse structure for a volume get response
type GetResponse struct {
Volume *VolInfo
}
// ListResponse structure for a volume list response
type ListResponse struct {
Volumes []*VolInfo
}
// CapabilitiesResponse structure for a volume capability response
type CapabilitiesResponse struct {
Capabilities Capability
}
// Capability represents the list of capabilities a volume driver can return
type Capability struct {
Scope string
}
// ErrorResponse is a formatted error message that docker can understand
type ErrorResponse struct {
Err string
}
func newRouter(drv *Driver) http.Handler {
r := chi.NewRouter()
r.Post(activatePath, func(w http.ResponseWriter, r *http.Request) {
res := map[string]interface{}{
"Implements": []string{"VolumeDriver"},
}
encodeResponse(w, res, nil, activatePath)
})
r.Post(createPath, func(w http.ResponseWriter, r *http.Request) {
var req CreateRequest
if decodeRequest(w, r, &req) {
err := drv.Create(&req)
encodeResponse(w, nil, err, createPath)
}
})
r.Post(removePath, func(w http.ResponseWriter, r *http.Request) {
var req RemoveRequest
if decodeRequest(w, r, &req) {
err := drv.Remove(&req)
encodeResponse(w, nil, err, removePath)
}
})
r.Post(mountPath, func(w http.ResponseWriter, r *http.Request) {
var req MountRequest
if decodeRequest(w, r, &req) {
res, err := drv.Mount(&req)
encodeResponse(w, res, err, mountPath)
}
})
r.Post(pathPath, func(w http.ResponseWriter, r *http.Request) {
var req PathRequest
if decodeRequest(w, r, &req) {
res, err := drv.Path(&req)
encodeResponse(w, res, err, pathPath)
}
})
r.Post(getPath, func(w http.ResponseWriter, r *http.Request) {
var req GetRequest
if decodeRequest(w, r, &req) {
res, err := drv.Get(&req)
encodeResponse(w, res, err, getPath)
}
})
r.Post(unmountPath, func(w http.ResponseWriter, r *http.Request) {
var req UnmountRequest
if decodeRequest(w, r, &req) {
err := drv.Unmount(&req)
encodeResponse(w, nil, err, unmountPath)
}
})
r.Post(listPath, func(w http.ResponseWriter, r *http.Request) {
res, err := drv.List()
encodeResponse(w, res, err, listPath)
})
r.Post(capsPath, func(w http.ResponseWriter, r *http.Request) {
res := &CapabilitiesResponse{
Capabilities: Capability{Scope: pluginScope},
}
encodeResponse(w, res, nil, capsPath)
})
return r
}
func decodeRequest(w http.ResponseWriter, r *http.Request, req interface{}) bool {
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return false
}
return true
}
func encodeResponse(w http.ResponseWriter, res interface{}, err error, path string) {
w.Header().Set("Content-Type", contentType)
if err != nil {
fs.Debugf(path, "Request returned error: %v", err)
w.WriteHeader(http.StatusInternalServerError)
res = &ErrorResponse{Err: err.Error()}
} else if res == nil {
res = struct{}{}
}
if err = json.NewEncoder(w).Encode(res); err != nil {
fs.Debugf(path, "Response encoding failed: %v", err)
}
}

View file

@ -0,0 +1,72 @@
// Package docker serves a remote suitable for use with docker volume api
package docker
import (
"context"
"path/filepath"
"strings"
"syscall"
"github.com/spf13/cobra"
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs/config/flags"
"github.com/rclone/rclone/vfs"
"github.com/rclone/rclone/vfs/vfsflags"
)
var (
pluginName = "rclone"
pluginScope = "local"
baseDir = "/var/lib/docker-volumes/rclone"
sockDir = "/run/docker/plugins"
defSpecDir = "/etc/docker/plugins"
stateFile = "docker-plugin.state"
socketAddr = "" // TCP listening address or empty string for Unix socket
socketGid = syscall.Getgid()
canPersist = false // allows writing to config file
forgetState = false
noSpec = false
)
func init() {
cmdFlags := Command.Flags()
// Add command specific flags
flags.StringVarP(cmdFlags, &baseDir, "base-dir", "", baseDir, "base directory for volumes")
flags.StringVarP(cmdFlags, &socketAddr, "socket-addr", "", socketAddr, "<host:port> or absolute path (default: /run/docker/plugins/rclone.sock)")
flags.IntVarP(cmdFlags, &socketGid, "socket-gid", "", socketGid, "GID for unix socket (default: current process GID)")
flags.BoolVarP(cmdFlags, &forgetState, "forget-state", "", forgetState, "skip restoring previous state")
flags.BoolVarP(cmdFlags, &noSpec, "no-spec", "", noSpec, "do not write spec file")
// Add common mount/vfs flags
mountlib.AddFlags(cmdFlags)
vfsflags.AddFlags(cmdFlags)
}
// Command definition for cobra
var Command = &cobra.Command{
Use: "docker",
Short: `Serve any remote on docker's volume plugin API.`,
Long: strings.ReplaceAll(longHelp, "|", "`") + vfs.Help,
Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(0, 0, command, args)
cmd.Run(false, false, command, func() error {
ctx := context.Background()
drv, err := NewDriver(ctx, baseDir, nil, nil, false, forgetState)
if err != nil {
return err
}
srv := NewServer(drv)
if socketAddr == "" {
// Listen on unix socket at /run/docker/plugins/<pluginName>.sock
return srv.ServeUnix(pluginName, socketGid)
}
if filepath.IsAbs(socketAddr) {
// Listen on unix socket at given path
return srv.ServeUnix(socketAddr, socketGid)
}
return srv.ServeTCP(socketAddr, "", nil, noSpec)
})
},
}

View file

@ -0,0 +1,414 @@
package docker_test
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/cmd/serve/docker"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fstest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
_ "github.com/rclone/rclone/backend/local"
_ "github.com/rclone/rclone/backend/memory"
_ "github.com/rclone/rclone/cmd/cmount"
_ "github.com/rclone/rclone/cmd/mount"
)
func initialise(ctx context.Context, t *testing.T) (string, fs.Fs) {
fstest.Initialise()
// Make test cache directory
testDir, err := fstest.LocalRemote()
require.NoError(t, err)
err = os.MkdirAll(testDir, 0755)
require.NoError(t, err)
// Make test file system
testFs, err := fs.NewFs(ctx, testDir)
require.NoError(t, err)
return testDir, testFs
}
func assertErrorContains(t *testing.T, err error, errString string, msgAndArgs ...interface{}) {
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), errString, msgAndArgs...)
}
}
func assertVolumeInfo(t *testing.T, v *docker.VolInfo, name, path string) {
assert.Equal(t, name, v.Name)
assert.Equal(t, path, v.Mountpoint)
assert.NotEmpty(t, v.CreatedAt)
_, err := time.Parse(time.RFC3339, v.CreatedAt)
assert.NoError(t, err)
}
func TestDockerPluginLogic(t *testing.T) {
ctx := context.Background()
oldCacheDir := config.CacheDir
testDir, testFs := initialise(ctx, t)
config.CacheDir = testDir
defer func() {
config.CacheDir = oldCacheDir
if !t.Failed() {
fstest.Purge(testFs)
_ = os.RemoveAll(testDir)
}
}()
// Create dummy volume driver
drv, err := docker.NewDriver(ctx, testDir, nil, nil, true, true)
require.NoError(t, err)
require.NotNil(t, drv)
// 1st volume request
volReq := &docker.CreateRequest{
Name: "vol1",
Options: docker.VolOpts{},
}
assertErrorContains(t, drv.Create(volReq), "volume must have either remote or backend")
volReq.Options["remote"] = testDir
assert.NoError(t, drv.Create(volReq))
path1 := filepath.Join(testDir, "vol1")
assert.ErrorIs(t, drv.Create(volReq), docker.ErrVolumeExists)
getReq := &docker.GetRequest{Name: "vol1"}
getRes, err := drv.Get(getReq)
assert.NoError(t, err)
require.NotNil(t, getRes)
assertVolumeInfo(t, getRes.Volume, "vol1", path1)
// 2nd volume request
volReq.Name = "vol2"
assert.NoError(t, drv.Create(volReq))
path2 := filepath.Join(testDir, "vol2")
listRes, err := drv.List()
require.NoError(t, err)
require.Equal(t, 2, len(listRes.Volumes))
assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1)
assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2)
// Try prohibited volume options
volReq.Name = "vol99"
volReq.Options["remote"] = testDir
volReq.Options["type"] = "memory"
err = drv.Create(volReq)
assertErrorContains(t, err, "volume must have either remote or backend")
volReq.Options["persist"] = "WrongBoolean"
err = drv.Create(volReq)
assertErrorContains(t, err, "cannot parse option")
volReq.Options["persist"] = "true"
delete(volReq.Options, "remote")
err = drv.Create(volReq)
assertErrorContains(t, err, "persist remotes is prohibited")
volReq.Options["persist"] = "false"
volReq.Options["memory-option-broken"] = "some-value"
err = drv.Create(volReq)
assertErrorContains(t, err, "unsupported backend option")
getReq.Name = "vol99"
getRes, err = drv.Get(getReq)
assert.Error(t, err)
assert.Nil(t, getRes)
// Test mount requests
mountReq := &docker.MountRequest{
Name: "vol2",
ID: "id1",
}
mountRes, err := drv.Mount(mountReq)
assert.NoError(t, err)
require.NotNil(t, mountRes)
assert.Equal(t, path2, mountRes.Mountpoint)
mountRes, err = drv.Mount(mountReq)
assert.Error(t, err)
assert.Nil(t, mountRes)
assertErrorContains(t, err, "already mounted by this id")
mountReq.ID = "id2"
mountRes, err = drv.Mount(mountReq)
assert.NoError(t, err)
require.NotNil(t, mountRes)
assert.Equal(t, path2, mountRes.Mountpoint)
unmountReq := &docker.UnmountRequest{
Name: "vol2",
ID: "id1",
}
err = drv.Unmount(unmountReq)
assert.NoError(t, err)
err = drv.Unmount(unmountReq)
assert.Error(t, err)
assertErrorContains(t, err, "not mounted by this id")
// Simulate plugin restart
drv2, err := docker.NewDriver(ctx, testDir, nil, nil, true, false)
assert.NoError(t, err)
require.NotNil(t, drv2)
// New plugin instance should pick up the saved state
listRes, err = drv2.List()
require.NoError(t, err)
require.Equal(t, 2, len(listRes.Volumes))
assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1)
assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2)
rmReq := &docker.RemoveRequest{Name: "vol2"}
err = drv.Remove(rmReq)
assertErrorContains(t, err, "volume is in use")
unmountReq.ID = "id1"
err = drv.Unmount(unmountReq)
assert.Error(t, err)
assertErrorContains(t, err, "not mounted by this id")
unmountReq.ID = "id2"
err = drv.Unmount(unmountReq)
assert.NoError(t, err)
err = drv.Unmount(unmountReq)
assert.EqualError(t, err, "volume is not mounted")
err = drv.Remove(rmReq)
assert.NoError(t, err)
}
const (
httpTimeout = 2 * time.Second
tempDelay = 10 * time.Millisecond
)
type APIClient struct {
t *testing.T
cli *http.Client
host string
}
func newAPIClient(t *testing.T, host, unixPath string) *APIClient {
tr := &http.Transport{
MaxIdleConns: 1,
IdleConnTimeout: httpTimeout,
DisableCompression: true,
}
if unixPath != "" {
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", unixPath)
}
} else {
dialer := &net.Dialer{
Timeout: httpTimeout,
KeepAlive: httpTimeout,
}
tr.DialContext = dialer.DialContext
}
cli := &http.Client{
Transport: tr,
Timeout: httpTimeout,
}
return &APIClient{
t: t,
cli: cli,
host: host,
}
}
func (a *APIClient) request(path string, in, out interface{}, wantErr bool) {
t := a.t
var (
dataIn []byte
dataOut []byte
err error
)
realm := "VolumeDriver"
if path == "Activate" {
realm = "Plugin"
}
url := fmt.Sprintf("http://%s/%s.%s", a.host, realm, path)
if str, isString := in.(string); isString {
dataIn = []byte(str)
} else {
dataIn, err = json.Marshal(in)
require.NoError(t, err)
}
fs.Logf(path, "<-- %s", dataIn)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(dataIn))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
res, err := a.cli.Do(req)
require.NoError(t, err)
wantStatus := http.StatusOK
if wantErr {
wantStatus = http.StatusInternalServerError
}
assert.Equal(t, wantStatus, res.StatusCode)
dataOut, err = ioutil.ReadAll(res.Body)
require.NoError(t, err)
err = res.Body.Close()
require.NoError(t, err)
if strPtr, isString := out.(*string); isString || wantErr {
require.True(t, isString, "must use string for error response")
if wantErr {
var errRes docker.ErrorResponse
err = json.Unmarshal(dataOut, &errRes)
require.NoError(t, err)
*strPtr = errRes.Err
} else {
*strPtr = strings.TrimSpace(string(dataOut))
}
} else {
err = json.Unmarshal(dataOut, out)
require.NoError(t, err)
}
fs.Logf(path, "--> %s", dataOut)
time.Sleep(tempDelay)
}
func testMountAPI(t *testing.T, sockAddr string) {
if _, mountFn := mountlib.ResolveMountMethod(""); mountFn == nil {
t.Skip("Test requires working mount command")
}
ctx := context.Background()
oldCacheDir := config.CacheDir
testDir, testFs := initialise(ctx, t)
config.CacheDir = testDir
defer func() {
config.CacheDir = oldCacheDir
if !t.Failed() {
fstest.Purge(testFs)
_ = os.RemoveAll(testDir)
}
}()
// Prepare API client
var cli *APIClient
var unixPath string
if sockAddr != "" {
cli = newAPIClient(t, sockAddr, "")
} else {
unixPath = filepath.Join(testDir, "rclone.sock")
cli = newAPIClient(t, "localhost", unixPath)
}
// Create mounting volume driver and listen for requests
drv, err := docker.NewDriver(ctx, testDir, nil, nil, false, true)
require.NoError(t, err)
require.NotNil(t, drv)
defer drv.Exit()
srv := docker.NewServer(drv)
go func() {
var errServe error
if unixPath != "" {
errServe = srv.ServeUnix(unixPath, os.Getgid())
} else {
errServe = srv.ServeTCP(sockAddr, testDir, nil, false)
}
assert.ErrorIs(t, errServe, http.ErrServerClosed)
}()
defer func() {
err := srv.Shutdown(ctx)
assert.NoError(t, err)
fs.Logf(nil, "Server stopped")
time.Sleep(tempDelay)
}()
time.Sleep(tempDelay) // Let server start
// Run test sequence
path1 := filepath.Join(testDir, "path1")
require.NoError(t, os.MkdirAll(path1, 0755))
mount1 := filepath.Join(testDir, "vol1")
res := ""
cli.request("Activate", "{}", &res, false)
assert.Contains(t, res, `"VolumeDriver"`)
createReq := docker.CreateRequest{
Name: "vol1",
Options: docker.VolOpts{"remote": path1},
}
cli.request("Create", createReq, &res, false)
assert.Equal(t, "{}", res)
cli.request("Create", createReq, &res, true)
assert.Contains(t, res, "volume already exists")
mountReq := docker.MountRequest{Name: "vol1", ID: "id1"}
var mountRes docker.MountResponse
cli.request("Mount", mountReq, &mountRes, false)
assert.Equal(t, mount1, mountRes.Mountpoint)
cli.request("Mount", mountReq, &res, true)
assert.Contains(t, res, "already mounted by this id")
removeReq := docker.RemoveRequest{Name: "vol1"}
cli.request("Remove", removeReq, &res, true)
assert.Contains(t, res, "volume is in use")
text := []byte("banana")
err = ioutil.WriteFile(filepath.Join(mount1, "txt"), text, 0644)
assert.NoError(t, err)
time.Sleep(tempDelay)
text2, err := ioutil.ReadFile(filepath.Join(path1, "txt"))
assert.NoError(t, err)
assert.Equal(t, text, text2)
unmountReq := docker.UnmountRequest{Name: "vol1", ID: "id1"}
cli.request("Unmount", unmountReq, &res, false)
assert.Equal(t, "{}", res)
cli.request("Unmount", unmountReq, &res, true)
assert.Equal(t, "volume is not mounted", res)
cli.request("Remove", removeReq, &res, false)
assert.Equal(t, "{}", res)
cli.request("Remove", removeReq, &res, true)
assert.Equal(t, "volume not found", res)
var listRes docker.ListResponse
cli.request("List", "{}", &listRes, false)
assert.Empty(t, listRes.Volumes)
}
func TestDockerPluginMountTCP(t *testing.T) {
testMountAPI(t, "localhost:53789")
}
func TestDockerPluginMountUnix(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Test is Linux-only")
}
testMountAPI(t, "")
}

360
cmd/serve/docker/driver.go Normal file
View file

@ -0,0 +1,360 @@
package docker
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify"
"github.com/pkg/errors"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/rclone/rclone/vfs/vfsflags"
)
// Driver implements docker driver api
type Driver struct {
root string
volumes map[string]*Volume
statePath string
dummy bool // disables real mounting
mntOpt mountlib.Options
vfsOpt vfscommon.Options
mu sync.Mutex
exitOnce sync.Once
hupChan chan os.Signal
monChan chan bool // exit if true for exit, refresh if false
}
// NewDriver makes a new docker driver
func NewDriver(ctx context.Context, root string, mntOpt *mountlib.Options, vfsOpt *vfscommon.Options, dummy, forgetState bool) (*Driver, error) {
// setup directories
cacheDir, err := filepath.Abs(config.CacheDir)
if err != nil {
return nil, errors.Wrap(err, "failed to make --cache-dir absolute")
}
err = os.MkdirAll(cacheDir, 0700)
if err != nil {
return nil, errors.Wrapf(err, "failed to create cache directory: %s", cacheDir)
}
//err = os.MkdirAll(root, 0755)
if err != nil {
return nil, errors.Wrapf(err, "failed to create mount root: %s", root)
}
// setup driver state
if mntOpt == nil {
mntOpt = &mountlib.Opt
}
if vfsOpt == nil {
vfsOpt = &vfsflags.Opt
}
drv := &Driver{
root: root,
statePath: filepath.Join(cacheDir, stateFile),
volumes: map[string]*Volume{},
mntOpt: *mntOpt,
vfsOpt: *vfsOpt,
dummy: dummy,
}
drv.mntOpt.Daemon = false
// restore from saved state
if !forgetState {
if err = drv.restoreState(ctx); err != nil {
return nil, errors.Wrap(err, "failed to restore state")
}
}
// start mount monitoring
drv.hupChan = make(chan os.Signal, 1)
drv.monChan = make(chan bool, 1)
mountlib.NotifyOnSigHup(drv.hupChan)
go drv.monitor()
// unmount all volumes on exit
atexit.Register(func() {
drv.exitOnce.Do(drv.Exit)
})
// notify systemd
if err := sysdnotify.Ready(); err != nil {
return nil, errors.Wrap(err, "failed to notify systemd")
}
return drv, nil
}
// Exit will unmount all currently mounted volumes
func (drv *Driver) Exit() {
fs.Debugf(nil, "Unmount all volumes")
drv.mu.Lock()
defer drv.mu.Unlock()
reportErr(sysdnotify.Stopping())
drv.monChan <- true // ask monitor to exit
for _, vol := range drv.volumes {
reportErr(vol.unmountAll())
vol.Mounts = []string{} // never persist mounts at exit
}
reportErr(drv.saveState())
drv.dummy = true // no more mounts
}
// monitor all mounts
func (drv *Driver) monitor() {
for {
// https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement
monChan := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(drv.monChan),
}
hupChan := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(drv.monChan),
}
sources := []reflect.SelectCase{monChan, hupChan}
volumes := []*Volume{nil, nil}
drv.mu.Lock()
for _, vol := range drv.volumes {
if vol.mnt.ErrChan != nil {
errSource := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(vol.mnt.ErrChan),
}
sources = append(sources, errSource)
volumes = append(volumes, vol)
}
}
drv.mu.Unlock()
fs.Debugf(nil, "Monitoring %d volumes", len(sources)-2)
idx, val, _ := reflect.Select(sources)
switch idx {
case 0:
if val.Bool() {
fs.Debugf(nil, "Monitoring stopped")
return
}
case 1:
// user sent SIGHUP to clear the cache
drv.clearCache()
default:
vol := volumes[idx]
if err := val.Interface(); err != nil {
fs.Logf(nil, "Volume %q unmounted externally: %v", vol.Name, err)
} else {
fs.Infof(nil, "Volume %q unmounted externally", vol.Name)
}
drv.mu.Lock()
reportErr(vol.unmountAll())
drv.mu.Unlock()
}
}
}
// clearCache will clear cache of all volumes
func (drv *Driver) clearCache() {
fs.Debugf(nil, "Clear all caches")
drv.mu.Lock()
defer drv.mu.Unlock()
for _, vol := range drv.volumes {
reportErr(vol.clearCache())
}
}
func reportErr(err error) {
if err != nil {
fs.Errorf("docker plugin", "%v", err)
}
}
// Create volume
// To use subpath we are limited to defining a new volume definition via alias
func (drv *Driver) Create(req *CreateRequest) error {
ctx := context.Background()
drv.mu.Lock()
defer drv.mu.Unlock()
name := req.Name
fs.Debugf(nil, "Create volume %q", name)
if vol, _ := drv.getVolume(name); vol != nil {
return ErrVolumeExists
}
vol, err := newVolume(ctx, name, req.Options, drv)
if err != nil {
return err
}
drv.volumes[name] = vol
return drv.saveState()
}
// Remove volume
func (drv *Driver) Remove(req *RemoveRequest) error {
ctx := context.Background()
drv.mu.Lock()
defer drv.mu.Unlock()
vol, err := drv.getVolume(req.Name)
if err != nil {
return err
}
if err = vol.remove(ctx); err != nil {
return err
}
delete(drv.volumes, vol.Name)
return drv.saveState()
}
// List volumes handled by the driver
func (drv *Driver) List() (*ListResponse, error) {
drv.mu.Lock()
defer drv.mu.Unlock()
volumeList := drv.listVolumes()
fs.Debugf(nil, "List: %v", volumeList)
res := &ListResponse{
Volumes: []*VolInfo{},
}
for _, name := range volumeList {
vol := drv.volumes[name]
res.Volumes = append(res.Volumes, vol.getInfo())
}
return res, nil
}
// Get volume info
func (drv *Driver) Get(req *GetRequest) (*GetResponse, error) {
drv.mu.Lock()
defer drv.mu.Unlock()
vol, err := drv.getVolume(req.Name)
if err != nil {
return nil, err
}
return &GetResponse{Volume: vol.getInfo()}, nil
}
// Path returns path of the requested volume
func (drv *Driver) Path(req *PathRequest) (*PathResponse, error) {
drv.mu.Lock()
defer drv.mu.Unlock()
vol, err := drv.getVolume(req.Name)
if err != nil {
return nil, err
}
return &PathResponse{Mountpoint: vol.MountPoint}, nil
}
// Mount volume
func (drv *Driver) Mount(req *MountRequest) (*MountResponse, error) {
drv.mu.Lock()
defer drv.mu.Unlock()
vol, err := drv.getVolume(req.Name)
if err == nil {
err = vol.mount(req.ID)
}
if err == nil {
err = drv.saveState()
}
if err != nil {
return nil, err
}
return &MountResponse{Mountpoint: vol.MountPoint}, nil
}
// Unmount volume
func (drv *Driver) Unmount(req *UnmountRequest) error {
drv.mu.Lock()
defer drv.mu.Unlock()
vol, err := drv.getVolume(req.Name)
if err == nil {
err = vol.unmount(req.ID)
}
if err == nil {
err = drv.saveState()
}
return err
}
// getVolume returns volume by name
func (drv *Driver) getVolume(name string) (*Volume, error) {
vol := drv.volumes[name]
if vol == nil {
return nil, ErrVolumeNotFound
}
return vol, nil
}
// listVolumes returns list volume listVolumes
func (drv *Driver) listVolumes() []string {
names := []string{}
for key := range drv.volumes {
names = append(names, key)
}
sort.Strings(names)
return names
}
// saveState saves volumes handled by driver to persistent store
func (drv *Driver) saveState() error {
volumeList := drv.listVolumes()
fs.Debugf(nil, "Save state %v to %s", volumeList, drv.statePath)
state := []*Volume{}
for _, key := range volumeList {
vol := drv.volumes[key]
vol.prepareState()
state = append(state, vol)
}
data, err := json.Marshal(state)
if err == nil {
err = ioutil.WriteFile(drv.statePath, data, 0600)
}
if err != nil {
return errors.Wrap(err, "failed to write state")
}
return nil
}
// restoreState recreates volumes from saved driver state
func (drv *Driver) restoreState(ctx context.Context) error {
fs.Debugf(nil, "Restore state from %s", drv.statePath)
data, err := ioutil.ReadFile(drv.statePath)
if os.IsNotExist(err) {
return nil
}
var state []*Volume
if err == nil {
err = json.Unmarshal(data, &state)
}
if err != nil {
fs.Logf(nil, "Failed to restore plugin state: %v", err)
return nil
}
for _, vol := range state {
if err := vol.restoreState(ctx, drv); err != nil {
fs.Logf(nil, "Failed to restore volume %q: %v", vol.Name, err)
continue
}
drv.volumes[vol.Name] = vol
}
return nil
}

7
cmd/serve/docker/help.go Normal file
View file

@ -0,0 +1,7 @@
package docker
// Note: "|" will be replaced by backticks
var longHelp = `
This command implements the Docker volume plugin API allowing docker to use
rclone as a data storage mechanism for various cloud providers.
`

307
cmd/serve/docker/options.go Normal file
View file

@ -0,0 +1,307 @@
package docker
import (
"strings"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/fspath"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/rclone/rclone/vfs/vfsflags"
"github.com/pkg/errors"
"github.com/spf13/pflag"
)
// applyOptions configures volume from request options.
//
// There are 5 special options:
// - "remote" aka "fs" determines existing remote from config file
// with a path or on-the-fly remote using the ":backend:" syntax.
// It is usually named "remote" in documentation but can be aliased as
// "fs" to avoid confusion with the "remote" option of some backends.
// - "type" is equivalent to the ":backend:" syntax (optional).
// - "path" provides explicit on-remote path for "type" (optional).
// - "mount-type" can be "mount", "cmount" or "mount2", defaults to
// first found (optional).
// - "persist" is reserved for future to create remotes persisted
// in rclone.conf similar to rcd (optional).
//
// Unlike rcd we use the flat naming scheme for mount, vfs and backend
// options without substructures. Dashes, underscores and mixed case
// in option names can be used interchangeably. Option name conflicts
// can be resolved in a manner similar to rclone CLI by adding prefixes:
// "vfs-", primary mount backend type like "sftp-", and so on.
//
// After triaging the options are put in MountOpt, VFSOpt or connect
// string for actual filesystem setup and in volume.Options for saving
// the state.
func (vol *Volume) applyOptions(volOpt VolOpts) error {
// copy options to override later
mntOpt := &vol.mnt.MountOpt
vfsOpt := &vol.mnt.VFSOpt
*mntOpt = vol.drv.mntOpt
*vfsOpt = vol.drv.vfsOpt
// vol.Options has all options except "remote" and "type"
vol.Options = VolOpts{}
vol.fsString = ""
var fsName, fsPath, fsType string
var explicitPath string
var fsOpt configmap.Simple
// parse "remote" or "type"
for key, str := range volOpt {
switch key {
case "":
continue
case "remote", "fs":
p, err := fspath.Parse(str)
if err != nil || p.Name == ":" {
return errors.Wrapf(err, "cannot parse path %q", str)
}
fsName, fsPath, fsOpt = p.Name, p.Path, p.Config
vol.Fs = str
case "type":
fsType = str
vol.Type = str
case "path":
explicitPath = str
vol.Path = str
default:
vol.Options[key] = str
}
}
// find options supported by backend
if strings.HasPrefix(fsName, ":") {
fsType = fsName[1:]
fsName = ""
}
if fsType == "" {
fsType = "local"
if fsName != "" {
var ok bool
fsType, ok = fs.ConfigMap(nil, fsName, nil).Get("type")
if !ok {
return fs.ErrorNotFoundInConfigFile
}
}
}
if explicitPath != "" {
if fsPath != "" {
fs.Logf(nil, "Explicit path will override connection string")
}
fsPath = explicitPath
}
fsInfo, err := fs.Find(fsType)
if err != nil {
return errors.Errorf("unknown filesystem type %q", fsType)
}
// handle remaining options, override fsOpt
if fsOpt == nil {
fsOpt = configmap.Simple{}
}
opt := rc.Params{}
for key, val := range vol.Options {
opt[key] = val
}
for key := range opt {
var ok bool
var err error
switch normalOptName(key) {
case "persist":
vol.persist, err = opt.GetBool(key)
ok = true
case "mount-type":
vol.mountType, err = opt.GetString(key)
ok = true
}
if err != nil {
return errors.Wrapf(err, "cannot parse option %q", key)
}
if !ok {
// try to use as a mount option in mntOpt
ok, err = getMountOption(mntOpt, opt, key)
if ok && err != nil {
return errors.Wrapf(err, "cannot parse mount option %q", key)
}
}
if !ok {
// try as a vfs option in vfsOpt
ok, err = getVFSOption(vfsOpt, opt, key)
if ok && err != nil {
return errors.Wrapf(err, "cannot parse vfs option %q", key)
}
}
if !ok {
// try as a backend option in fsOpt (backends use "_" instead of "-")
optWithPrefix := strings.ReplaceAll(normalOptName(key), "-", "_")
fsOptName := strings.TrimPrefix(optWithPrefix, fsType+"_")
hasFsPrefix := optWithPrefix != fsOptName
if !hasFsPrefix || fsInfo.Options.Get(fsOptName) == nil {
fs.Logf(nil, "Option %q is not supported by backend %q", key, fsType)
return errors.Errorf("unsupported backend option %q", key)
}
fsOpt[fsOptName], err = opt.GetString(key)
if err != nil {
return errors.Wrapf(err, "cannot parse backend option %q", key)
}
}
}
// build remote string from fsName, fsType, fsOpt, fsPath
colon := ":"
comma := ","
if fsName == "" {
fsName = ":" + fsType
}
connString := fsOpt.String()
if fsName == "" && fsType == "" {
colon = ""
connString = ""
}
if connString == "" {
comma = ""
}
vol.fsString = fsName + comma + connString + colon + fsPath
return vol.validate()
}
func getMountOption(mntOpt *mountlib.Options, opt rc.Params, key string) (ok bool, err error) {
ok = true
switch normalOptName(key) {
case "debug-fuse":
mntOpt.DebugFUSE, err = opt.GetBool(key)
case "attr-timeout":
mntOpt.AttrTimeout, err = opt.GetDuration(key)
case "option":
mntOpt.ExtraOptions, err = getStringArray(opt, key)
case "fuse-flag":
mntOpt.ExtraFlags, err = getStringArray(opt, key)
case "daemon":
mntOpt.Daemon, err = opt.GetBool(key)
case "daemon-timeout":
mntOpt.DaemonTimeout, err = opt.GetDuration(key)
case "default-permissions":
mntOpt.DefaultPermissions, err = opt.GetBool(key)
case "allow-non-empty":
mntOpt.AllowNonEmpty, err = opt.GetBool(key)
case "allow-root":
mntOpt.AllowRoot, err = opt.GetBool(key)
case "allow-other":
mntOpt.AllowOther, err = opt.GetBool(key)
case "async-read":
mntOpt.AsyncRead, err = opt.GetBool(key)
case "max-read-ahead":
err = getFVarP(&mntOpt.MaxReadAhead, opt, key)
case "write-back-cache":
mntOpt.WritebackCache, err = opt.GetBool(key)
case "volname":
mntOpt.VolumeName, err = opt.GetString(key)
case "noappledouble":
mntOpt.NoAppleDouble, err = opt.GetBool(key)
case "noapplexattr":
mntOpt.NoAppleXattr, err = opt.GetBool(key)
case "network-mode":
mntOpt.NetworkMode, err = opt.GetBool(key)
default:
ok = false
}
return
}
func getVFSOption(vfsOpt *vfscommon.Options, opt rc.Params, key string) (ok bool, err error) {
var intVal int64
ok = true
switch normalOptName(key) {
// options prefixed with "vfs-"
case "vfs-cache-mode":
err = getFVarP(&vfsOpt.CacheMode, opt, key)
case "vfs-cache-poll-interval":
vfsOpt.CachePollInterval, err = opt.GetDuration(key)
case "vfs-cache-max-age":
vfsOpt.CacheMaxAge, err = opt.GetDuration(key)
case "vfs-cache-max-size":
err = getFVarP(&vfsOpt.CacheMaxSize, opt, key)
case "vfs-read-chunk-size":
err = getFVarP(&vfsOpt.ChunkSize, opt, key)
case "vfs-read-chunk-size-limit":
err = getFVarP(&vfsOpt.ChunkSizeLimit, opt, key)
case "vfs-case-insensitive":
vfsOpt.CaseInsensitive, err = opt.GetBool(key)
case "vfs-write-wait":
vfsOpt.WriteWait, err = opt.GetDuration(key)
case "vfs-read-wait":
vfsOpt.ReadWait, err = opt.GetDuration(key)
case "vfs-write-back":
vfsOpt.WriteBack, err = opt.GetDuration(key)
case "vfs-read-ahead":
err = getFVarP(&vfsOpt.ReadAhead, opt, key)
case "vfs-used-is-size":
vfsOpt.UsedIsSize, err = opt.GetBool(key)
// unprefixed vfs options
case "no-modtime":
vfsOpt.NoModTime, err = opt.GetBool(key)
case "no-checksum":
vfsOpt.NoChecksum, err = opt.GetBool(key)
case "dir-cache-time":
vfsOpt.DirCacheTime, err = opt.GetDuration(key)
case "poll-interval":
vfsOpt.PollInterval, err = opt.GetDuration(key)
case "read-only":
vfsOpt.ReadOnly, err = opt.GetBool(key)
case "dir-perms":
perms := &vfsflags.FileMode{Mode: &vfsOpt.DirPerms}
err = getFVarP(perms, opt, key)
case "file-perms":
perms := &vfsflags.FileMode{Mode: &vfsOpt.FilePerms}
err = getFVarP(perms, opt, key)
// unprefixed unix-only vfs options
case "umask":
intVal, err = opt.GetInt64(key)
vfsOpt.Umask = int(intVal)
case "uid":
intVal, err = opt.GetInt64(key)
vfsOpt.UID = uint32(intVal)
case "gid":
intVal, err = opt.GetInt64(key)
vfsOpt.GID = uint32(intVal)
// non-vfs options
default:
ok = false
}
return
}
func getFVarP(pvalue pflag.Value, opt rc.Params, key string) error {
str, err := opt.GetString(key)
if err != nil {
return err
}
return pvalue.Set(str)
}
func getStringArray(opt rc.Params, key string) ([]string, error) {
str, err := opt.GetString(key)
if err != nil {
return nil, err
}
return strings.Split(str, ","), nil
}
func normalOptName(key string) string {
return strings.ReplaceAll(strings.TrimPrefix(strings.ToLower(key), "--"), "_", "-")
}

100
cmd/serve/docker/serve.go Normal file
View file

@ -0,0 +1,100 @@
package docker
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/atexit"
)
// Server connects plugin with docker daemon by protocol
type Server http.Server
// NewServer creates new docker plugin server
func NewServer(drv *Driver) *Server {
return &Server{Handler: newRouter(drv)}
}
// Shutdown the server
func (s *Server) Shutdown(ctx context.Context) error {
hs := (*http.Server)(s)
return hs.Shutdown(ctx)
}
func (s *Server) serve(listener net.Listener, addr, tempFile string) error {
if tempFile != "" {
atexit.Register(func() {
// remove spec file or self-created unix socket
fs.Debugf(nil, "Removing stale file %s", tempFile)
_ = os.Remove(tempFile)
})
}
hs := (*http.Server)(s)
return hs.Serve(listener)
}
// ServeUnix makes the handler to listen for requests in a unix socket.
// It also creates the socket file in the right directory for docker to read.
func (s *Server) ServeUnix(path string, gid int) error {
listener, socketPath, err := newUnixListener(path, gid)
if err != nil {
return err
}
if socketPath != "" {
path = socketPath
fs.Infof(nil, "Serving unix socket: %s", path)
} else {
fs.Infof(nil, "Serving systemd socket")
}
return s.serve(listener, path, socketPath)
}
// ServeTCP makes the handler listen for request on a given TCP address.
// It also writes the spec file in the right directory for docker to read.
func (s *Server) ServeTCP(addr, specDir string, tlsConfig *tls.Config, noSpec bool) error {
listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}
if tlsConfig != nil {
tlsConfig.NextProtos = []string{"http/1.1"}
listener = tls.NewListener(listener, tlsConfig)
}
addr = listener.Addr().String()
specFile := ""
if !noSpec {
specFile, err = writeSpecFile(addr, "tcp", specDir)
if err != nil {
return err
}
}
fs.Infof(nil, "Serving TCP socket: %s", addr)
return s.serve(listener, addr, specFile)
}
func writeSpecFile(addr, proto, specDir string) (string, error) {
if specDir == "" && runtime.GOOS == "windows" {
specDir = os.TempDir()
}
if specDir == "" {
specDir = defSpecDir
}
if err := os.MkdirAll(specDir, 0755); err != nil {
return "", err
}
specFile := filepath.Join(specDir, "rclone.spec")
url := fmt.Sprintf("%s://%s", proto, addr)
if err := ioutil.WriteFile(specFile, []byte(url), 0644); err != nil {
return "", err
}
fs.Debugf(nil, "Plugin spec has been written to %s", specFile)
return specFile, nil
}

View file

@ -0,0 +1,17 @@
// +build linux,!android
package docker
import (
"os"
"github.com/coreos/go-systemd/activation"
"github.com/coreos/go-systemd/util"
)
func systemdActivationFiles() []*os.File {
if util.IsRunningSystemd() {
return activation.Files(false)
}
return nil
}

View file

@ -0,0 +1,11 @@
// +build !linux android
package docker
import (
"os"
)
func systemdActivationFiles() []*os.File {
return nil
}

56
cmd/serve/docker/unix.go Normal file
View file

@ -0,0 +1,56 @@
// +build linux freebsd
package docker
import (
"fmt"
"net"
"os"
"path/filepath"
)
func newUnixListener(path string, gid int) (net.Listener, string, error) {
// try systemd socket activation
fds := systemdActivationFiles()
switch len(fds) {
case 0:
// fall thru
case 1:
listener, err := net.FileListener(fds[0])
return listener, "", err
default:
return nil, "", fmt.Errorf("expected only one socket from systemd, got %d", len(fds))
}
// create socket outselves
if filepath.Ext(path) == "" {
path += ".sock"
}
if !filepath.IsAbs(path) {
path = filepath.Join(sockDir, path)
}
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, "", err
}
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return nil, "", err
}
listener, err := net.Listen("unix", path)
if err != nil {
return nil, "", err
}
if err = os.Chmod(path, 0660); err != nil {
return nil, "", err
}
if os.Geteuid() == 0 {
if err = os.Chown(path, 0, gid); err != nil {
return nil, "", err
}
}
// we don't use spec file with unix sockets
return listener, path, nil
}

View file

@ -0,0 +1,12 @@
// +build !linux,!freebsd
package docker
import (
"errors"
"net"
)
func newUnixListener(path string, gid int) (net.Listener, string, error) {
return nil, "", errors.New("unix sockets require Linux or FreeBSD")
}

326
cmd/serve/docker/volume.go Normal file
View file

@ -0,0 +1,326 @@
package docker
import (
"context"
"os"
"path/filepath"
"runtime"
"sort"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/cmd/mountlib"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/rc"
)
// Errors
var (
ErrVolumeNotFound = errors.New("volume not found")
ErrVolumeExists = errors.New("volume already exists")
ErrMountpointExists = errors.New("non-empty mountpoint already exists")
)
// Volume keeps volume runtime state
// Public members get persisted in saved state
type Volume struct {
Name string `json:"name"`
MountPoint string `json:"mountpoint"`
CreatedAt time.Time `json:"created"`
Fs string `json:"fs"` // remote[,connectString]:path
Type string `json:"type,omitempty"` // same as ":backend:"
Path string `json:"path,omitempty"` // for "remote:path" or ":backend:path"
Options VolOpts `json:"options"` // all options together
Mounts []string `json:"mounts"` // mountReqs as a string list
mountReqs map[string]interface{}
fsString string // result of merging Fs, Type and Options
persist bool
mountType string
drv *Driver
mnt *mountlib.MountPoint
}
// VolOpts keeps volume options
type VolOpts map[string]string
// VolInfo represents a volume for Get and List requests
type VolInfo struct {
Name string
Mountpoint string `json:",omitempty"`
CreatedAt string `json:",omitempty"`
Status map[string]interface{} `json:",omitempty"`
}
func newVolume(ctx context.Context, name string, volOpt VolOpts, drv *Driver) (*Volume, error) {
path := filepath.Join(drv.root, name)
mnt := &mountlib.MountPoint{
MountPoint: path,
}
vol := &Volume{
Name: name,
MountPoint: path,
CreatedAt: time.Now(),
drv: drv,
mnt: mnt,
mountReqs: make(map[string]interface{}),
}
err := vol.applyOptions(volOpt)
if err == nil {
err = vol.setup(ctx)
}
if err != nil {
return nil, err
}
return vol, nil
}
// getInfo returns short digest about volume
func (vol *Volume) getInfo() *VolInfo {
vol.prepareState()
return &VolInfo{
Name: vol.Name,
CreatedAt: vol.CreatedAt.Format(time.RFC3339),
Mountpoint: vol.MountPoint,
Status: rc.Params{"Mounts": vol.Mounts},
}
}
// prepareState prepares volume for saving state
func (vol *Volume) prepareState() {
vol.Mounts = []string{}
for id := range vol.mountReqs {
vol.Mounts = append(vol.Mounts, id)
}
sort.Strings(vol.Mounts)
}
// restoreState updates volume from saved state
func (vol *Volume) restoreState(ctx context.Context, drv *Driver) error {
vol.drv = drv
vol.mnt = &mountlib.MountPoint{
MountPoint: vol.MountPoint,
}
volOpt := vol.Options
volOpt["fs"] = vol.Fs
volOpt["type"] = vol.Type
if err := vol.applyOptions(volOpt); err != nil {
return err
}
if err := vol.validate(); err != nil {
return err
}
if err := vol.setup(ctx); err != nil {
return err
}
for _, id := range vol.Mounts {
if err := vol.mount(id); err != nil {
return err
}
}
return nil
}
// validate volume
func (vol *Volume) validate() error {
if vol.Name == "" {
return errors.New("volume name is required")
}
if (vol.Type != "" && vol.Fs != "") || (vol.Type == "" && vol.Fs == "") {
return errors.New("volume must have either remote or backend type")
}
if vol.persist && vol.Type == "" {
return errors.New("backend type is required to persist remotes")
}
if vol.persist && !canPersist {
return errors.New("using backend type to persist remotes is prohibited")
}
if vol.MountPoint == "" {
return errors.New("mount point is required")
}
if vol.mountReqs == nil {
vol.mountReqs = make(map[string]interface{})
}
return nil
}
// checkMountpoint verifies that mount point is an existing empty directory
func (vol *Volume) checkMountpoint() error {
path := vol.mnt.MountPoint
if runtime.GOOS == "windows" {
path = filepath.Dir(path)
}
_, err := os.Lstat(path)
if os.IsNotExist(err) {
if err = os.MkdirAll(path, 0700); err != nil {
return errors.Wrapf(err, "failed to create mountpoint: %s", path)
}
} else if err != nil {
return err
}
if runtime.GOOS != "windows" {
if err := mountlib.CheckMountEmpty(path); err != nil {
return ErrMountpointExists
}
}
return nil
}
// setup volume filesystem
func (vol *Volume) setup(ctx context.Context) error {
fs.Debugf(nil, "Setup volume %q as %q at path %s", vol.Name, vol.fsString, vol.MountPoint)
if err := vol.checkMountpoint(); err != nil {
return err
}
if vol.drv.dummy {
return nil
}
_, mountFn := mountlib.ResolveMountMethod(vol.mountType)
if mountFn == nil {
if vol.mountType != "" {
return errors.Errorf("unsupported mount type %q", vol.mountType)
}
return errors.New("mount command unsupported by this build")
}
vol.mnt.MountFn = mountFn
if vol.persist {
// Add remote to config file
params := rc.Params{}
for key, val := range vol.Options {
params[key] = val
}
updateMode := config.UpdateRemoteOpt{}
_, err := config.CreateRemote(ctx, vol.Name, vol.Type, params, updateMode)
if err != nil {
return err
}
}
// Use existing remote
f, err := fs.NewFs(ctx, vol.fsString)
if err == nil {
vol.mnt.Fs = f
}
return err
}
// remove volume filesystem and mounts
func (vol *Volume) remove(ctx context.Context) error {
count := len(vol.mountReqs)
fs.Debugf(nil, "Remove volume %q (count %d)", vol.Name, count)
if count > 0 {
return errors.New("volume is in use")
}
if !vol.drv.dummy {
shutdownFn := vol.mnt.Fs.Features().Shutdown
if shutdownFn != nil {
if err := shutdownFn(ctx); err != nil {
return err
}
}
}
if vol.persist {
// Remote remote from config file
config.DeleteRemote(vol.Name)
}
return nil
}
// clearCache will clear VFS cache for the volume
func (vol *Volume) clearCache() error {
VFS := vol.mnt.VFS
if VFS == nil {
return nil
}
root, err := VFS.Root()
if err != nil {
return errors.Wrapf(err, "error reading root: %v", VFS.Fs())
}
root.ForgetAll()
return nil
}
// mount volume filesystem
func (vol *Volume) mount(id string) error {
drv := vol.drv
count := len(vol.mountReqs)
fs.Debugf(nil, "Mount volume %q for id %q at path %s (count %d)",
vol.Name, id, vol.MountPoint, count)
if _, found := vol.mountReqs[id]; found {
return errors.New("volume is already mounted by this id")
}
if count > 0 { // already mounted
vol.mountReqs[id] = nil
return nil
}
if drv.dummy {
vol.mountReqs[id] = nil
return nil
}
if vol.mnt.Fs == nil {
return errors.New("volume filesystem is not ready")
}
if err := vol.mnt.Mount(); err != nil {
return err
}
vol.mnt.MountedOn = time.Now()
vol.mountReqs[id] = nil
vol.drv.monChan <- false // ask monitor to refresh channels
return nil
}
// unmount volume
func (vol *Volume) unmount(id string) error {
count := len(vol.mountReqs)
fs.Debugf(nil, "Unmount volume %q from id %q at path %s (count %d)",
vol.Name, id, vol.MountPoint, count)
if count == 0 {
return errors.New("volume is not mounted")
}
if _, found := vol.mountReqs[id]; !found {
return errors.New("volume is not mounted by this id")
}
delete(vol.mountReqs, id)
if len(vol.mountReqs) > 0 {
return nil // more mounts left
}
if vol.drv.dummy {
return nil
}
mnt := vol.mnt
if mnt.UnmountFn != nil {
if err := mnt.UnmountFn(); err != nil {
return err
}
}
mnt.ErrChan = nil
mnt.UnmountFn = nil
mnt.VFS = nil
vol.drv.monChan <- false // ask monitor to refresh channels
return nil
}
func (vol *Volume) unmountAll() error {
var firstErr error
for id := range vol.mountReqs {
err := vol.unmount(id)
if firstErr == nil {
firstErr = err
}
}
return firstErr
}

View file

@ -5,6 +5,7 @@ import (
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/cmd/serve/dlna"
"github.com/rclone/rclone/cmd/serve/docker"
"github.com/rclone/rclone/cmd/serve/ftp"
"github.com/rclone/rclone/cmd/serve/http"
"github.com/rclone/rclone/cmd/serve/restic"
@ -30,6 +31,9 @@ func init() {
if sftp.Command != nil {
Command.AddCommand(sftp.Command)
}
if docker.Command != nil {
Command.AddCommand(docker.Command)
}
cmd.Root.AddCommand(Command)
}

1
go.mod
View file

@ -22,6 +22,7 @@ require (
github.com/calebcase/tmpfile v1.0.2 // indirect
github.com/colinmarc/hdfs/v2 v2.2.0
github.com/coreos/go-semver v0.3.0
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/dop251/scsu v0.0.0-20200422003335-8fadfb689669
github.com/dropbox/dropbox-sdk-go-unofficial v1.0.1-0.20210114204226-41fdcdae8a53
github.com/gabriel-vasile/mimetype v1.2.0

2
go.sum
View file

@ -160,8 +160,10 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=