diff --git a/cmd/serve/docker/api.go b/cmd/serve/docker/api.go new file mode 100644 index 000000000..b4c85a1fb --- /dev/null +++ b/cmd/serve/docker/api.go @@ -0,0 +1,175 @@ +package docker + +import ( + "encoding/json" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/rclone/rclone/fs" +) + +const ( + contentType = "application/vnd.docker.plugins.v1.1+json" + activatePath = "/Plugin.Activate" + createPath = "/VolumeDriver.Create" + getPath = "/VolumeDriver.Get" + listPath = "/VolumeDriver.List" + removePath = "/VolumeDriver.Remove" + pathPath = "/VolumeDriver.Path" + mountPath = "/VolumeDriver.Mount" + unmountPath = "/VolumeDriver.Unmount" + capsPath = "/VolumeDriver.Capabilities" +) + +// CreateRequest is the structure that docker's requests are deserialized to. +type CreateRequest struct { + Name string + Options map[string]string `json:"Opts,omitempty"` +} + +// RemoveRequest structure for a volume remove request +type RemoveRequest struct { + Name string +} + +// MountRequest structure for a volume mount request +type MountRequest struct { + Name string + ID string +} + +// MountResponse structure for a volume mount response +type MountResponse struct { + Mountpoint string +} + +// UnmountRequest structure for a volume unmount request +type UnmountRequest struct { + Name string + ID string +} + +// PathRequest structure for a volume path request +type PathRequest struct { + Name string +} + +// PathResponse structure for a volume path response +type PathResponse struct { + Mountpoint string +} + +// GetRequest structure for a volume get request +type GetRequest struct { + Name string +} + +// GetResponse structure for a volume get response +type GetResponse struct { + Volume *VolInfo +} + +// ListResponse structure for a volume list response +type ListResponse struct { + Volumes []*VolInfo +} + +// CapabilitiesResponse structure for a volume capability response +type CapabilitiesResponse struct { + Capabilities Capability +} + +// Capability represents the list of capabilities a volume driver can return +type Capability struct { + Scope string +} + +// ErrorResponse is a formatted error message that docker can understand +type ErrorResponse struct { + Err string +} + +func newRouter(drv *Driver) http.Handler { + r := chi.NewRouter() + r.Post(activatePath, func(w http.ResponseWriter, r *http.Request) { + res := map[string]interface{}{ + "Implements": []string{"VolumeDriver"}, + } + encodeResponse(w, res, nil, activatePath) + }) + r.Post(createPath, func(w http.ResponseWriter, r *http.Request) { + var req CreateRequest + if decodeRequest(w, r, &req) { + err := drv.Create(&req) + encodeResponse(w, nil, err, createPath) + } + }) + r.Post(removePath, func(w http.ResponseWriter, r *http.Request) { + var req RemoveRequest + if decodeRequest(w, r, &req) { + err := drv.Remove(&req) + encodeResponse(w, nil, err, removePath) + } + }) + r.Post(mountPath, func(w http.ResponseWriter, r *http.Request) { + var req MountRequest + if decodeRequest(w, r, &req) { + res, err := drv.Mount(&req) + encodeResponse(w, res, err, mountPath) + } + }) + r.Post(pathPath, func(w http.ResponseWriter, r *http.Request) { + var req PathRequest + if decodeRequest(w, r, &req) { + res, err := drv.Path(&req) + encodeResponse(w, res, err, pathPath) + } + }) + r.Post(getPath, func(w http.ResponseWriter, r *http.Request) { + var req GetRequest + if decodeRequest(w, r, &req) { + res, err := drv.Get(&req) + encodeResponse(w, res, err, getPath) + } + }) + r.Post(unmountPath, func(w http.ResponseWriter, r *http.Request) { + var req UnmountRequest + if decodeRequest(w, r, &req) { + err := drv.Unmount(&req) + encodeResponse(w, nil, err, unmountPath) + } + }) + r.Post(listPath, func(w http.ResponseWriter, r *http.Request) { + res, err := drv.List() + encodeResponse(w, res, err, listPath) + }) + r.Post(capsPath, func(w http.ResponseWriter, r *http.Request) { + res := &CapabilitiesResponse{ + Capabilities: Capability{Scope: pluginScope}, + } + encodeResponse(w, res, nil, capsPath) + }) + return r +} + +func decodeRequest(w http.ResponseWriter, r *http.Request, req interface{}) bool { + if err := json.NewDecoder(r.Body).Decode(req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return false + } + return true +} + +func encodeResponse(w http.ResponseWriter, res interface{}, err error, path string) { + w.Header().Set("Content-Type", contentType) + if err != nil { + fs.Debugf(path, "Request returned error: %v", err) + w.WriteHeader(http.StatusInternalServerError) + res = &ErrorResponse{Err: err.Error()} + } else if res == nil { + res = struct{}{} + } + if err = json.NewEncoder(w).Encode(res); err != nil { + fs.Debugf(path, "Response encoding failed: %v", err) + } +} diff --git a/cmd/serve/docker/docker.go b/cmd/serve/docker/docker.go new file mode 100644 index 000000000..7b7725180 --- /dev/null +++ b/cmd/serve/docker/docker.go @@ -0,0 +1,72 @@ +// Package docker serves a remote suitable for use with docker volume api +package docker + +import ( + "context" + "path/filepath" + "strings" + "syscall" + + "github.com/spf13/cobra" + + "github.com/rclone/rclone/cmd" + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/fs/config/flags" + "github.com/rclone/rclone/vfs" + "github.com/rclone/rclone/vfs/vfsflags" +) + +var ( + pluginName = "rclone" + pluginScope = "local" + baseDir = "/var/lib/docker-volumes/rclone" + sockDir = "/run/docker/plugins" + defSpecDir = "/etc/docker/plugins" + stateFile = "docker-plugin.state" + socketAddr = "" // TCP listening address or empty string for Unix socket + socketGid = syscall.Getgid() + canPersist = false // allows writing to config file + forgetState = false + noSpec = false +) + +func init() { + cmdFlags := Command.Flags() + // Add command specific flags + flags.StringVarP(cmdFlags, &baseDir, "base-dir", "", baseDir, "base directory for volumes") + flags.StringVarP(cmdFlags, &socketAddr, "socket-addr", "", socketAddr, " or absolute path (default: /run/docker/plugins/rclone.sock)") + flags.IntVarP(cmdFlags, &socketGid, "socket-gid", "", socketGid, "GID for unix socket (default: current process GID)") + flags.BoolVarP(cmdFlags, &forgetState, "forget-state", "", forgetState, "skip restoring previous state") + flags.BoolVarP(cmdFlags, &noSpec, "no-spec", "", noSpec, "do not write spec file") + // Add common mount/vfs flags + mountlib.AddFlags(cmdFlags) + vfsflags.AddFlags(cmdFlags) +} + +// Command definition for cobra +var Command = &cobra.Command{ + Use: "docker", + Short: `Serve any remote on docker's volume plugin API.`, + Long: strings.ReplaceAll(longHelp, "|", "`") + vfs.Help, + + Run: func(command *cobra.Command, args []string) { + cmd.CheckArgs(0, 0, command, args) + cmd.Run(false, false, command, func() error { + ctx := context.Background() + drv, err := NewDriver(ctx, baseDir, nil, nil, false, forgetState) + if err != nil { + return err + } + srv := NewServer(drv) + if socketAddr == "" { + // Listen on unix socket at /run/docker/plugins/.sock + return srv.ServeUnix(pluginName, socketGid) + } + if filepath.IsAbs(socketAddr) { + // Listen on unix socket at given path + return srv.ServeUnix(socketAddr, socketGid) + } + return srv.ServeTCP(socketAddr, "", nil, noSpec) + }) + }, +} diff --git a/cmd/serve/docker/docker_test.go b/cmd/serve/docker/docker_test.go new file mode 100644 index 000000000..9511b692f --- /dev/null +++ b/cmd/serve/docker/docker_test.go @@ -0,0 +1,414 @@ +package docker_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/cmd/serve/docker" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/fstest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + _ "github.com/rclone/rclone/backend/local" + _ "github.com/rclone/rclone/backend/memory" + _ "github.com/rclone/rclone/cmd/cmount" + _ "github.com/rclone/rclone/cmd/mount" +) + +func initialise(ctx context.Context, t *testing.T) (string, fs.Fs) { + fstest.Initialise() + + // Make test cache directory + testDir, err := fstest.LocalRemote() + require.NoError(t, err) + err = os.MkdirAll(testDir, 0755) + require.NoError(t, err) + + // Make test file system + testFs, err := fs.NewFs(ctx, testDir) + require.NoError(t, err) + return testDir, testFs +} + +func assertErrorContains(t *testing.T, err error, errString string, msgAndArgs ...interface{}) { + assert.Error(t, err) + if err != nil { + assert.Contains(t, err.Error(), errString, msgAndArgs...) + } +} + +func assertVolumeInfo(t *testing.T, v *docker.VolInfo, name, path string) { + assert.Equal(t, name, v.Name) + assert.Equal(t, path, v.Mountpoint) + assert.NotEmpty(t, v.CreatedAt) + _, err := time.Parse(time.RFC3339, v.CreatedAt) + assert.NoError(t, err) +} + +func TestDockerPluginLogic(t *testing.T) { + ctx := context.Background() + oldCacheDir := config.CacheDir + testDir, testFs := initialise(ctx, t) + config.CacheDir = testDir + defer func() { + config.CacheDir = oldCacheDir + if !t.Failed() { + fstest.Purge(testFs) + _ = os.RemoveAll(testDir) + } + }() + + // Create dummy volume driver + drv, err := docker.NewDriver(ctx, testDir, nil, nil, true, true) + require.NoError(t, err) + require.NotNil(t, drv) + + // 1st volume request + volReq := &docker.CreateRequest{ + Name: "vol1", + Options: docker.VolOpts{}, + } + assertErrorContains(t, drv.Create(volReq), "volume must have either remote or backend") + + volReq.Options["remote"] = testDir + assert.NoError(t, drv.Create(volReq)) + path1 := filepath.Join(testDir, "vol1") + + assert.ErrorIs(t, drv.Create(volReq), docker.ErrVolumeExists) + + getReq := &docker.GetRequest{Name: "vol1"} + getRes, err := drv.Get(getReq) + assert.NoError(t, err) + require.NotNil(t, getRes) + assertVolumeInfo(t, getRes.Volume, "vol1", path1) + + // 2nd volume request + volReq.Name = "vol2" + assert.NoError(t, drv.Create(volReq)) + path2 := filepath.Join(testDir, "vol2") + + listRes, err := drv.List() + require.NoError(t, err) + require.Equal(t, 2, len(listRes.Volumes)) + assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1) + assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2) + + // Try prohibited volume options + volReq.Name = "vol99" + volReq.Options["remote"] = testDir + volReq.Options["type"] = "memory" + err = drv.Create(volReq) + assertErrorContains(t, err, "volume must have either remote or backend") + + volReq.Options["persist"] = "WrongBoolean" + err = drv.Create(volReq) + assertErrorContains(t, err, "cannot parse option") + + volReq.Options["persist"] = "true" + delete(volReq.Options, "remote") + err = drv.Create(volReq) + assertErrorContains(t, err, "persist remotes is prohibited") + + volReq.Options["persist"] = "false" + volReq.Options["memory-option-broken"] = "some-value" + err = drv.Create(volReq) + assertErrorContains(t, err, "unsupported backend option") + + getReq.Name = "vol99" + getRes, err = drv.Get(getReq) + assert.Error(t, err) + assert.Nil(t, getRes) + + // Test mount requests + mountReq := &docker.MountRequest{ + Name: "vol2", + ID: "id1", + } + mountRes, err := drv.Mount(mountReq) + assert.NoError(t, err) + require.NotNil(t, mountRes) + assert.Equal(t, path2, mountRes.Mountpoint) + + mountRes, err = drv.Mount(mountReq) + assert.Error(t, err) + assert.Nil(t, mountRes) + assertErrorContains(t, err, "already mounted by this id") + + mountReq.ID = "id2" + mountRes, err = drv.Mount(mountReq) + assert.NoError(t, err) + require.NotNil(t, mountRes) + assert.Equal(t, path2, mountRes.Mountpoint) + + unmountReq := &docker.UnmountRequest{ + Name: "vol2", + ID: "id1", + } + err = drv.Unmount(unmountReq) + assert.NoError(t, err) + + err = drv.Unmount(unmountReq) + assert.Error(t, err) + assertErrorContains(t, err, "not mounted by this id") + + // Simulate plugin restart + drv2, err := docker.NewDriver(ctx, testDir, nil, nil, true, false) + assert.NoError(t, err) + require.NotNil(t, drv2) + + // New plugin instance should pick up the saved state + listRes, err = drv2.List() + require.NoError(t, err) + require.Equal(t, 2, len(listRes.Volumes)) + assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1) + assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2) + + rmReq := &docker.RemoveRequest{Name: "vol2"} + err = drv.Remove(rmReq) + assertErrorContains(t, err, "volume is in use") + + unmountReq.ID = "id1" + err = drv.Unmount(unmountReq) + assert.Error(t, err) + assertErrorContains(t, err, "not mounted by this id") + + unmountReq.ID = "id2" + err = drv.Unmount(unmountReq) + assert.NoError(t, err) + + err = drv.Unmount(unmountReq) + assert.EqualError(t, err, "volume is not mounted") + + err = drv.Remove(rmReq) + assert.NoError(t, err) +} + +const ( + httpTimeout = 2 * time.Second + tempDelay = 10 * time.Millisecond +) + +type APIClient struct { + t *testing.T + cli *http.Client + host string +} + +func newAPIClient(t *testing.T, host, unixPath string) *APIClient { + tr := &http.Transport{ + MaxIdleConns: 1, + IdleConnTimeout: httpTimeout, + DisableCompression: true, + } + + if unixPath != "" { + tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", unixPath) + } + } else { + dialer := &net.Dialer{ + Timeout: httpTimeout, + KeepAlive: httpTimeout, + } + tr.DialContext = dialer.DialContext + } + + cli := &http.Client{ + Transport: tr, + Timeout: httpTimeout, + } + return &APIClient{ + t: t, + cli: cli, + host: host, + } +} + +func (a *APIClient) request(path string, in, out interface{}, wantErr bool) { + t := a.t + var ( + dataIn []byte + dataOut []byte + err error + ) + + realm := "VolumeDriver" + if path == "Activate" { + realm = "Plugin" + } + url := fmt.Sprintf("http://%s/%s.%s", a.host, realm, path) + + if str, isString := in.(string); isString { + dataIn = []byte(str) + } else { + dataIn, err = json.Marshal(in) + require.NoError(t, err) + } + fs.Logf(path, "<-- %s", dataIn) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(dataIn)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + res, err := a.cli.Do(req) + require.NoError(t, err) + + wantStatus := http.StatusOK + if wantErr { + wantStatus = http.StatusInternalServerError + } + assert.Equal(t, wantStatus, res.StatusCode) + + dataOut, err = ioutil.ReadAll(res.Body) + require.NoError(t, err) + err = res.Body.Close() + require.NoError(t, err) + + if strPtr, isString := out.(*string); isString || wantErr { + require.True(t, isString, "must use string for error response") + if wantErr { + var errRes docker.ErrorResponse + err = json.Unmarshal(dataOut, &errRes) + require.NoError(t, err) + *strPtr = errRes.Err + } else { + *strPtr = strings.TrimSpace(string(dataOut)) + } + } else { + err = json.Unmarshal(dataOut, out) + require.NoError(t, err) + } + fs.Logf(path, "--> %s", dataOut) + time.Sleep(tempDelay) +} + +func testMountAPI(t *testing.T, sockAddr string) { + if _, mountFn := mountlib.ResolveMountMethod(""); mountFn == nil { + t.Skip("Test requires working mount command") + } + + ctx := context.Background() + oldCacheDir := config.CacheDir + testDir, testFs := initialise(ctx, t) + config.CacheDir = testDir + defer func() { + config.CacheDir = oldCacheDir + if !t.Failed() { + fstest.Purge(testFs) + _ = os.RemoveAll(testDir) + } + }() + + // Prepare API client + var cli *APIClient + var unixPath string + if sockAddr != "" { + cli = newAPIClient(t, sockAddr, "") + } else { + unixPath = filepath.Join(testDir, "rclone.sock") + cli = newAPIClient(t, "localhost", unixPath) + } + + // Create mounting volume driver and listen for requests + drv, err := docker.NewDriver(ctx, testDir, nil, nil, false, true) + require.NoError(t, err) + require.NotNil(t, drv) + defer drv.Exit() + + srv := docker.NewServer(drv) + go func() { + var errServe error + if unixPath != "" { + errServe = srv.ServeUnix(unixPath, os.Getgid()) + } else { + errServe = srv.ServeTCP(sockAddr, testDir, nil, false) + } + assert.ErrorIs(t, errServe, http.ErrServerClosed) + }() + defer func() { + err := srv.Shutdown(ctx) + assert.NoError(t, err) + fs.Logf(nil, "Server stopped") + time.Sleep(tempDelay) + }() + time.Sleep(tempDelay) // Let server start + + // Run test sequence + path1 := filepath.Join(testDir, "path1") + require.NoError(t, os.MkdirAll(path1, 0755)) + mount1 := filepath.Join(testDir, "vol1") + res := "" + + cli.request("Activate", "{}", &res, false) + assert.Contains(t, res, `"VolumeDriver"`) + + createReq := docker.CreateRequest{ + Name: "vol1", + Options: docker.VolOpts{"remote": path1}, + } + cli.request("Create", createReq, &res, false) + assert.Equal(t, "{}", res) + cli.request("Create", createReq, &res, true) + assert.Contains(t, res, "volume already exists") + + mountReq := docker.MountRequest{Name: "vol1", ID: "id1"} + var mountRes docker.MountResponse + cli.request("Mount", mountReq, &mountRes, false) + assert.Equal(t, mount1, mountRes.Mountpoint) + cli.request("Mount", mountReq, &res, true) + assert.Contains(t, res, "already mounted by this id") + + removeReq := docker.RemoveRequest{Name: "vol1"} + cli.request("Remove", removeReq, &res, true) + assert.Contains(t, res, "volume is in use") + + text := []byte("banana") + err = ioutil.WriteFile(filepath.Join(mount1, "txt"), text, 0644) + assert.NoError(t, err) + time.Sleep(tempDelay) + + text2, err := ioutil.ReadFile(filepath.Join(path1, "txt")) + assert.NoError(t, err) + assert.Equal(t, text, text2) + + unmountReq := docker.UnmountRequest{Name: "vol1", ID: "id1"} + cli.request("Unmount", unmountReq, &res, false) + assert.Equal(t, "{}", res) + cli.request("Unmount", unmountReq, &res, true) + assert.Equal(t, "volume is not mounted", res) + + cli.request("Remove", removeReq, &res, false) + assert.Equal(t, "{}", res) + cli.request("Remove", removeReq, &res, true) + assert.Equal(t, "volume not found", res) + + var listRes docker.ListResponse + cli.request("List", "{}", &listRes, false) + assert.Empty(t, listRes.Volumes) +} + +func TestDockerPluginMountTCP(t *testing.T) { + testMountAPI(t, "localhost:53789") +} + +func TestDockerPluginMountUnix(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("Test is Linux-only") + } + testMountAPI(t, "") +} diff --git a/cmd/serve/docker/driver.go b/cmd/serve/docker/driver.go new file mode 100644 index 000000000..6a7be6f32 --- /dev/null +++ b/cmd/serve/docker/driver.go @@ -0,0 +1,360 @@ +package docker + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "sort" + "sync" + + sysdnotify "github.com/iguanesolutions/go-systemd/v5/notify" + "github.com/pkg/errors" + + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/vfs/vfscommon" + "github.com/rclone/rclone/vfs/vfsflags" +) + +// Driver implements docker driver api +type Driver struct { + root string + volumes map[string]*Volume + statePath string + dummy bool // disables real mounting + mntOpt mountlib.Options + vfsOpt vfscommon.Options + mu sync.Mutex + exitOnce sync.Once + hupChan chan os.Signal + monChan chan bool // exit if true for exit, refresh if false +} + +// NewDriver makes a new docker driver +func NewDriver(ctx context.Context, root string, mntOpt *mountlib.Options, vfsOpt *vfscommon.Options, dummy, forgetState bool) (*Driver, error) { + // setup directories + cacheDir, err := filepath.Abs(config.CacheDir) + if err != nil { + return nil, errors.Wrap(err, "failed to make --cache-dir absolute") + } + err = os.MkdirAll(cacheDir, 0700) + if err != nil { + return nil, errors.Wrapf(err, "failed to create cache directory: %s", cacheDir) + } + + //err = os.MkdirAll(root, 0755) + if err != nil { + return nil, errors.Wrapf(err, "failed to create mount root: %s", root) + } + + // setup driver state + if mntOpt == nil { + mntOpt = &mountlib.Opt + } + if vfsOpt == nil { + vfsOpt = &vfsflags.Opt + } + drv := &Driver{ + root: root, + statePath: filepath.Join(cacheDir, stateFile), + volumes: map[string]*Volume{}, + mntOpt: *mntOpt, + vfsOpt: *vfsOpt, + dummy: dummy, + } + drv.mntOpt.Daemon = false + + // restore from saved state + if !forgetState { + if err = drv.restoreState(ctx); err != nil { + return nil, errors.Wrap(err, "failed to restore state") + } + } + + // start mount monitoring + drv.hupChan = make(chan os.Signal, 1) + drv.monChan = make(chan bool, 1) + mountlib.NotifyOnSigHup(drv.hupChan) + go drv.monitor() + + // unmount all volumes on exit + atexit.Register(func() { + drv.exitOnce.Do(drv.Exit) + }) + + // notify systemd + if err := sysdnotify.Ready(); err != nil { + return nil, errors.Wrap(err, "failed to notify systemd") + } + + return drv, nil +} + +// Exit will unmount all currently mounted volumes +func (drv *Driver) Exit() { + fs.Debugf(nil, "Unmount all volumes") + drv.mu.Lock() + defer drv.mu.Unlock() + + reportErr(sysdnotify.Stopping()) + drv.monChan <- true // ask monitor to exit + for _, vol := range drv.volumes { + reportErr(vol.unmountAll()) + vol.Mounts = []string{} // never persist mounts at exit + } + reportErr(drv.saveState()) + drv.dummy = true // no more mounts +} + +// monitor all mounts +func (drv *Driver) monitor() { + for { + // https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement + monChan := reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(drv.monChan), + } + hupChan := reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(drv.monChan), + } + sources := []reflect.SelectCase{monChan, hupChan} + volumes := []*Volume{nil, nil} + + drv.mu.Lock() + for _, vol := range drv.volumes { + if vol.mnt.ErrChan != nil { + errSource := reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(vol.mnt.ErrChan), + } + sources = append(sources, errSource) + volumes = append(volumes, vol) + } + } + drv.mu.Unlock() + + fs.Debugf(nil, "Monitoring %d volumes", len(sources)-2) + idx, val, _ := reflect.Select(sources) + switch idx { + case 0: + if val.Bool() { + fs.Debugf(nil, "Monitoring stopped") + return + } + case 1: + // user sent SIGHUP to clear the cache + drv.clearCache() + default: + vol := volumes[idx] + if err := val.Interface(); err != nil { + fs.Logf(nil, "Volume %q unmounted externally: %v", vol.Name, err) + } else { + fs.Infof(nil, "Volume %q unmounted externally", vol.Name) + } + drv.mu.Lock() + reportErr(vol.unmountAll()) + drv.mu.Unlock() + } + } +} + +// clearCache will clear cache of all volumes +func (drv *Driver) clearCache() { + fs.Debugf(nil, "Clear all caches") + drv.mu.Lock() + defer drv.mu.Unlock() + + for _, vol := range drv.volumes { + reportErr(vol.clearCache()) + } +} + +func reportErr(err error) { + if err != nil { + fs.Errorf("docker plugin", "%v", err) + } +} + +// Create volume +// To use subpath we are limited to defining a new volume definition via alias +func (drv *Driver) Create(req *CreateRequest) error { + ctx := context.Background() + drv.mu.Lock() + defer drv.mu.Unlock() + + name := req.Name + fs.Debugf(nil, "Create volume %q", name) + + if vol, _ := drv.getVolume(name); vol != nil { + return ErrVolumeExists + } + + vol, err := newVolume(ctx, name, req.Options, drv) + if err != nil { + return err + } + drv.volumes[name] = vol + return drv.saveState() +} + +// Remove volume +func (drv *Driver) Remove(req *RemoveRequest) error { + ctx := context.Background() + drv.mu.Lock() + defer drv.mu.Unlock() + vol, err := drv.getVolume(req.Name) + if err != nil { + return err + } + if err = vol.remove(ctx); err != nil { + return err + } + delete(drv.volumes, vol.Name) + return drv.saveState() +} + +// List volumes handled by the driver +func (drv *Driver) List() (*ListResponse, error) { + drv.mu.Lock() + defer drv.mu.Unlock() + + volumeList := drv.listVolumes() + fs.Debugf(nil, "List: %v", volumeList) + + res := &ListResponse{ + Volumes: []*VolInfo{}, + } + for _, name := range volumeList { + vol := drv.volumes[name] + res.Volumes = append(res.Volumes, vol.getInfo()) + } + return res, nil +} + +// Get volume info +func (drv *Driver) Get(req *GetRequest) (*GetResponse, error) { + drv.mu.Lock() + defer drv.mu.Unlock() + vol, err := drv.getVolume(req.Name) + if err != nil { + return nil, err + } + return &GetResponse{Volume: vol.getInfo()}, nil +} + +// Path returns path of the requested volume +func (drv *Driver) Path(req *PathRequest) (*PathResponse, error) { + drv.mu.Lock() + defer drv.mu.Unlock() + vol, err := drv.getVolume(req.Name) + if err != nil { + return nil, err + } + return &PathResponse{Mountpoint: vol.MountPoint}, nil +} + +// Mount volume +func (drv *Driver) Mount(req *MountRequest) (*MountResponse, error) { + drv.mu.Lock() + defer drv.mu.Unlock() + vol, err := drv.getVolume(req.Name) + if err == nil { + err = vol.mount(req.ID) + } + if err == nil { + err = drv.saveState() + } + if err != nil { + return nil, err + } + return &MountResponse{Mountpoint: vol.MountPoint}, nil +} + +// Unmount volume +func (drv *Driver) Unmount(req *UnmountRequest) error { + drv.mu.Lock() + defer drv.mu.Unlock() + vol, err := drv.getVolume(req.Name) + if err == nil { + err = vol.unmount(req.ID) + } + if err == nil { + err = drv.saveState() + } + return err +} + +// getVolume returns volume by name +func (drv *Driver) getVolume(name string) (*Volume, error) { + vol := drv.volumes[name] + if vol == nil { + return nil, ErrVolumeNotFound + } + return vol, nil +} + +// listVolumes returns list volume listVolumes +func (drv *Driver) listVolumes() []string { + names := []string{} + for key := range drv.volumes { + names = append(names, key) + } + sort.Strings(names) + return names +} + +// saveState saves volumes handled by driver to persistent store +func (drv *Driver) saveState() error { + volumeList := drv.listVolumes() + fs.Debugf(nil, "Save state %v to %s", volumeList, drv.statePath) + + state := []*Volume{} + for _, key := range volumeList { + vol := drv.volumes[key] + vol.prepareState() + state = append(state, vol) + } + + data, err := json.Marshal(state) + if err == nil { + err = ioutil.WriteFile(drv.statePath, data, 0600) + } + if err != nil { + return errors.Wrap(err, "failed to write state") + } + return nil +} + +// restoreState recreates volumes from saved driver state +func (drv *Driver) restoreState(ctx context.Context) error { + fs.Debugf(nil, "Restore state from %s", drv.statePath) + + data, err := ioutil.ReadFile(drv.statePath) + if os.IsNotExist(err) { + return nil + } + + var state []*Volume + if err == nil { + err = json.Unmarshal(data, &state) + } + if err != nil { + fs.Logf(nil, "Failed to restore plugin state: %v", err) + return nil + } + + for _, vol := range state { + if err := vol.restoreState(ctx, drv); err != nil { + fs.Logf(nil, "Failed to restore volume %q: %v", vol.Name, err) + continue + } + drv.volumes[vol.Name] = vol + } + return nil +} diff --git a/cmd/serve/docker/help.go b/cmd/serve/docker/help.go new file mode 100644 index 000000000..4ea8d9b7c --- /dev/null +++ b/cmd/serve/docker/help.go @@ -0,0 +1,7 @@ +package docker + +// Note: "|" will be replaced by backticks +var longHelp = ` +This command implements the Docker volume plugin API allowing docker to use +rclone as a data storage mechanism for various cloud providers. +` diff --git a/cmd/serve/docker/options.go b/cmd/serve/docker/options.go new file mode 100644 index 000000000..0c5dc6ae8 --- /dev/null +++ b/cmd/serve/docker/options.go @@ -0,0 +1,307 @@ +package docker + +import ( + "strings" + + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/fspath" + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/vfs/vfscommon" + "github.com/rclone/rclone/vfs/vfsflags" + + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +// applyOptions configures volume from request options. +// +// There are 5 special options: +// - "remote" aka "fs" determines existing remote from config file +// with a path or on-the-fly remote using the ":backend:" syntax. +// It is usually named "remote" in documentation but can be aliased as +// "fs" to avoid confusion with the "remote" option of some backends. +// - "type" is equivalent to the ":backend:" syntax (optional). +// - "path" provides explicit on-remote path for "type" (optional). +// - "mount-type" can be "mount", "cmount" or "mount2", defaults to +// first found (optional). +// - "persist" is reserved for future to create remotes persisted +// in rclone.conf similar to rcd (optional). +// +// Unlike rcd we use the flat naming scheme for mount, vfs and backend +// options without substructures. Dashes, underscores and mixed case +// in option names can be used interchangeably. Option name conflicts +// can be resolved in a manner similar to rclone CLI by adding prefixes: +// "vfs-", primary mount backend type like "sftp-", and so on. +// +// After triaging the options are put in MountOpt, VFSOpt or connect +// string for actual filesystem setup and in volume.Options for saving +// the state. +func (vol *Volume) applyOptions(volOpt VolOpts) error { + // copy options to override later + mntOpt := &vol.mnt.MountOpt + vfsOpt := &vol.mnt.VFSOpt + *mntOpt = vol.drv.mntOpt + *vfsOpt = vol.drv.vfsOpt + + // vol.Options has all options except "remote" and "type" + vol.Options = VolOpts{} + vol.fsString = "" + + var fsName, fsPath, fsType string + var explicitPath string + var fsOpt configmap.Simple + + // parse "remote" or "type" + for key, str := range volOpt { + switch key { + case "": + continue + case "remote", "fs": + p, err := fspath.Parse(str) + if err != nil || p.Name == ":" { + return errors.Wrapf(err, "cannot parse path %q", str) + } + fsName, fsPath, fsOpt = p.Name, p.Path, p.Config + vol.Fs = str + case "type": + fsType = str + vol.Type = str + case "path": + explicitPath = str + vol.Path = str + default: + vol.Options[key] = str + } + } + + // find options supported by backend + if strings.HasPrefix(fsName, ":") { + fsType = fsName[1:] + fsName = "" + } + if fsType == "" { + fsType = "local" + if fsName != "" { + var ok bool + fsType, ok = fs.ConfigMap(nil, fsName, nil).Get("type") + if !ok { + return fs.ErrorNotFoundInConfigFile + } + } + } + if explicitPath != "" { + if fsPath != "" { + fs.Logf(nil, "Explicit path will override connection string") + } + fsPath = explicitPath + } + fsInfo, err := fs.Find(fsType) + if err != nil { + return errors.Errorf("unknown filesystem type %q", fsType) + } + + // handle remaining options, override fsOpt + if fsOpt == nil { + fsOpt = configmap.Simple{} + } + opt := rc.Params{} + for key, val := range vol.Options { + opt[key] = val + } + for key := range opt { + var ok bool + var err error + + switch normalOptName(key) { + case "persist": + vol.persist, err = opt.GetBool(key) + ok = true + case "mount-type": + vol.mountType, err = opt.GetString(key) + ok = true + } + if err != nil { + return errors.Wrapf(err, "cannot parse option %q", key) + } + + if !ok { + // try to use as a mount option in mntOpt + ok, err = getMountOption(mntOpt, opt, key) + if ok && err != nil { + return errors.Wrapf(err, "cannot parse mount option %q", key) + } + } + if !ok { + // try as a vfs option in vfsOpt + ok, err = getVFSOption(vfsOpt, opt, key) + if ok && err != nil { + return errors.Wrapf(err, "cannot parse vfs option %q", key) + } + } + + if !ok { + // try as a backend option in fsOpt (backends use "_" instead of "-") + optWithPrefix := strings.ReplaceAll(normalOptName(key), "-", "_") + fsOptName := strings.TrimPrefix(optWithPrefix, fsType+"_") + hasFsPrefix := optWithPrefix != fsOptName + if !hasFsPrefix || fsInfo.Options.Get(fsOptName) == nil { + fs.Logf(nil, "Option %q is not supported by backend %q", key, fsType) + return errors.Errorf("unsupported backend option %q", key) + } + fsOpt[fsOptName], err = opt.GetString(key) + if err != nil { + return errors.Wrapf(err, "cannot parse backend option %q", key) + } + } + } + + // build remote string from fsName, fsType, fsOpt, fsPath + colon := ":" + comma := "," + if fsName == "" { + fsName = ":" + fsType + } + connString := fsOpt.String() + if fsName == "" && fsType == "" { + colon = "" + connString = "" + } + if connString == "" { + comma = "" + } + vol.fsString = fsName + comma + connString + colon + fsPath + + return vol.validate() +} + +func getMountOption(mntOpt *mountlib.Options, opt rc.Params, key string) (ok bool, err error) { + ok = true + switch normalOptName(key) { + case "debug-fuse": + mntOpt.DebugFUSE, err = opt.GetBool(key) + case "attr-timeout": + mntOpt.AttrTimeout, err = opt.GetDuration(key) + case "option": + mntOpt.ExtraOptions, err = getStringArray(opt, key) + case "fuse-flag": + mntOpt.ExtraFlags, err = getStringArray(opt, key) + case "daemon": + mntOpt.Daemon, err = opt.GetBool(key) + case "daemon-timeout": + mntOpt.DaemonTimeout, err = opt.GetDuration(key) + case "default-permissions": + mntOpt.DefaultPermissions, err = opt.GetBool(key) + case "allow-non-empty": + mntOpt.AllowNonEmpty, err = opt.GetBool(key) + case "allow-root": + mntOpt.AllowRoot, err = opt.GetBool(key) + case "allow-other": + mntOpt.AllowOther, err = opt.GetBool(key) + case "async-read": + mntOpt.AsyncRead, err = opt.GetBool(key) + case "max-read-ahead": + err = getFVarP(&mntOpt.MaxReadAhead, opt, key) + case "write-back-cache": + mntOpt.WritebackCache, err = opt.GetBool(key) + case "volname": + mntOpt.VolumeName, err = opt.GetString(key) + case "noappledouble": + mntOpt.NoAppleDouble, err = opt.GetBool(key) + case "noapplexattr": + mntOpt.NoAppleXattr, err = opt.GetBool(key) + case "network-mode": + mntOpt.NetworkMode, err = opt.GetBool(key) + default: + ok = false + } + return +} + +func getVFSOption(vfsOpt *vfscommon.Options, opt rc.Params, key string) (ok bool, err error) { + var intVal int64 + ok = true + switch normalOptName(key) { + + // options prefixed with "vfs-" + case "vfs-cache-mode": + err = getFVarP(&vfsOpt.CacheMode, opt, key) + case "vfs-cache-poll-interval": + vfsOpt.CachePollInterval, err = opt.GetDuration(key) + case "vfs-cache-max-age": + vfsOpt.CacheMaxAge, err = opt.GetDuration(key) + case "vfs-cache-max-size": + err = getFVarP(&vfsOpt.CacheMaxSize, opt, key) + case "vfs-read-chunk-size": + err = getFVarP(&vfsOpt.ChunkSize, opt, key) + case "vfs-read-chunk-size-limit": + err = getFVarP(&vfsOpt.ChunkSizeLimit, opt, key) + case "vfs-case-insensitive": + vfsOpt.CaseInsensitive, err = opt.GetBool(key) + case "vfs-write-wait": + vfsOpt.WriteWait, err = opt.GetDuration(key) + case "vfs-read-wait": + vfsOpt.ReadWait, err = opt.GetDuration(key) + case "vfs-write-back": + vfsOpt.WriteBack, err = opt.GetDuration(key) + case "vfs-read-ahead": + err = getFVarP(&vfsOpt.ReadAhead, opt, key) + case "vfs-used-is-size": + vfsOpt.UsedIsSize, err = opt.GetBool(key) + + // unprefixed vfs options + case "no-modtime": + vfsOpt.NoModTime, err = opt.GetBool(key) + case "no-checksum": + vfsOpt.NoChecksum, err = opt.GetBool(key) + case "dir-cache-time": + vfsOpt.DirCacheTime, err = opt.GetDuration(key) + case "poll-interval": + vfsOpt.PollInterval, err = opt.GetDuration(key) + case "read-only": + vfsOpt.ReadOnly, err = opt.GetBool(key) + case "dir-perms": + perms := &vfsflags.FileMode{Mode: &vfsOpt.DirPerms} + err = getFVarP(perms, opt, key) + case "file-perms": + perms := &vfsflags.FileMode{Mode: &vfsOpt.FilePerms} + err = getFVarP(perms, opt, key) + + // unprefixed unix-only vfs options + case "umask": + intVal, err = opt.GetInt64(key) + vfsOpt.Umask = int(intVal) + case "uid": + intVal, err = opt.GetInt64(key) + vfsOpt.UID = uint32(intVal) + case "gid": + intVal, err = opt.GetInt64(key) + vfsOpt.GID = uint32(intVal) + + // non-vfs options + default: + ok = false + } + return +} + +func getFVarP(pvalue pflag.Value, opt rc.Params, key string) error { + str, err := opt.GetString(key) + if err != nil { + return err + } + return pvalue.Set(str) +} + +func getStringArray(opt rc.Params, key string) ([]string, error) { + str, err := opt.GetString(key) + if err != nil { + return nil, err + } + return strings.Split(str, ","), nil +} + +func normalOptName(key string) string { + return strings.ReplaceAll(strings.TrimPrefix(strings.ToLower(key), "--"), "_", "-") +} diff --git a/cmd/serve/docker/serve.go b/cmd/serve/docker/serve.go new file mode 100644 index 000000000..63c5f8d2a --- /dev/null +++ b/cmd/serve/docker/serve.go @@ -0,0 +1,100 @@ +package docker + +import ( + "context" + "crypto/tls" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "path/filepath" + "runtime" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/atexit" +) + +// Server connects plugin with docker daemon by protocol +type Server http.Server + +// NewServer creates new docker plugin server +func NewServer(drv *Driver) *Server { + return &Server{Handler: newRouter(drv)} +} + +// Shutdown the server +func (s *Server) Shutdown(ctx context.Context) error { + hs := (*http.Server)(s) + return hs.Shutdown(ctx) +} + +func (s *Server) serve(listener net.Listener, addr, tempFile string) error { + if tempFile != "" { + atexit.Register(func() { + // remove spec file or self-created unix socket + fs.Debugf(nil, "Removing stale file %s", tempFile) + _ = os.Remove(tempFile) + }) + } + hs := (*http.Server)(s) + return hs.Serve(listener) +} + +// ServeUnix makes the handler to listen for requests in a unix socket. +// It also creates the socket file in the right directory for docker to read. +func (s *Server) ServeUnix(path string, gid int) error { + listener, socketPath, err := newUnixListener(path, gid) + if err != nil { + return err + } + if socketPath != "" { + path = socketPath + fs.Infof(nil, "Serving unix socket: %s", path) + } else { + fs.Infof(nil, "Serving systemd socket") + } + return s.serve(listener, path, socketPath) +} + +// ServeTCP makes the handler listen for request on a given TCP address. +// It also writes the spec file in the right directory for docker to read. +func (s *Server) ServeTCP(addr, specDir string, tlsConfig *tls.Config, noSpec bool) error { + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + if tlsConfig != nil { + tlsConfig.NextProtos = []string{"http/1.1"} + listener = tls.NewListener(listener, tlsConfig) + } + addr = listener.Addr().String() + specFile := "" + if !noSpec { + specFile, err = writeSpecFile(addr, "tcp", specDir) + if err != nil { + return err + } + } + fs.Infof(nil, "Serving TCP socket: %s", addr) + return s.serve(listener, addr, specFile) +} + +func writeSpecFile(addr, proto, specDir string) (string, error) { + if specDir == "" && runtime.GOOS == "windows" { + specDir = os.TempDir() + } + if specDir == "" { + specDir = defSpecDir + } + if err := os.MkdirAll(specDir, 0755); err != nil { + return "", err + } + specFile := filepath.Join(specDir, "rclone.spec") + url := fmt.Sprintf("%s://%s", proto, addr) + if err := ioutil.WriteFile(specFile, []byte(url), 0644); err != nil { + return "", err + } + fs.Debugf(nil, "Plugin spec has been written to %s", specFile) + return specFile, nil +} diff --git a/cmd/serve/docker/systemd.go b/cmd/serve/docker/systemd.go new file mode 100644 index 000000000..049eb7dab --- /dev/null +++ b/cmd/serve/docker/systemd.go @@ -0,0 +1,17 @@ +// +build linux,!android + +package docker + +import ( + "os" + + "github.com/coreos/go-systemd/activation" + "github.com/coreos/go-systemd/util" +) + +func systemdActivationFiles() []*os.File { + if util.IsRunningSystemd() { + return activation.Files(false) + } + return nil +} diff --git a/cmd/serve/docker/systemd_unsupported.go b/cmd/serve/docker/systemd_unsupported.go new file mode 100644 index 000000000..acf97a2a7 --- /dev/null +++ b/cmd/serve/docker/systemd_unsupported.go @@ -0,0 +1,11 @@ +// +build !linux android + +package docker + +import ( + "os" +) + +func systemdActivationFiles() []*os.File { + return nil +} diff --git a/cmd/serve/docker/unix.go b/cmd/serve/docker/unix.go new file mode 100644 index 000000000..3b93ae36d --- /dev/null +++ b/cmd/serve/docker/unix.go @@ -0,0 +1,56 @@ +// +build linux freebsd + +package docker + +import ( + "fmt" + "net" + "os" + "path/filepath" +) + +func newUnixListener(path string, gid int) (net.Listener, string, error) { + // try systemd socket activation + fds := systemdActivationFiles() + switch len(fds) { + case 0: + // fall thru + case 1: + listener, err := net.FileListener(fds[0]) + return listener, "", err + default: + return nil, "", fmt.Errorf("expected only one socket from systemd, got %d", len(fds)) + } + + // create socket outselves + if filepath.Ext(path) == "" { + path += ".sock" + } + if !filepath.IsAbs(path) { + path = filepath.Join(sockDir, path) + } + + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, "", err + } + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return nil, "", err + } + + listener, err := net.Listen("unix", path) + if err != nil { + return nil, "", err + } + + if err = os.Chmod(path, 0660); err != nil { + return nil, "", err + } + if os.Geteuid() == 0 { + if err = os.Chown(path, 0, gid); err != nil { + return nil, "", err + } + } + + // we don't use spec file with unix sockets + return listener, path, nil +} diff --git a/cmd/serve/docker/unix_unsupported.go b/cmd/serve/docker/unix_unsupported.go new file mode 100644 index 000000000..75547c7c1 --- /dev/null +++ b/cmd/serve/docker/unix_unsupported.go @@ -0,0 +1,12 @@ +// +build !linux,!freebsd + +package docker + +import ( + "errors" + "net" +) + +func newUnixListener(path string, gid int) (net.Listener, string, error) { + return nil, "", errors.New("unix sockets require Linux or FreeBSD") +} diff --git a/cmd/serve/docker/volume.go b/cmd/serve/docker/volume.go new file mode 100644 index 000000000..98f577f3d --- /dev/null +++ b/cmd/serve/docker/volume.go @@ -0,0 +1,326 @@ +package docker + +import ( + "context" + "os" + "path/filepath" + "runtime" + "sort" + "time" + + "github.com/pkg/errors" + + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/fs/rc" +) + +// Errors +var ( + ErrVolumeNotFound = errors.New("volume not found") + ErrVolumeExists = errors.New("volume already exists") + ErrMountpointExists = errors.New("non-empty mountpoint already exists") +) + +// Volume keeps volume runtime state +// Public members get persisted in saved state +type Volume struct { + Name string `json:"name"` + MountPoint string `json:"mountpoint"` + CreatedAt time.Time `json:"created"` + Fs string `json:"fs"` // remote[,connectString]:path + Type string `json:"type,omitempty"` // same as ":backend:" + Path string `json:"path,omitempty"` // for "remote:path" or ":backend:path" + Options VolOpts `json:"options"` // all options together + Mounts []string `json:"mounts"` // mountReqs as a string list + mountReqs map[string]interface{} + fsString string // result of merging Fs, Type and Options + persist bool + mountType string + drv *Driver + mnt *mountlib.MountPoint +} + +// VolOpts keeps volume options +type VolOpts map[string]string + +// VolInfo represents a volume for Get and List requests +type VolInfo struct { + Name string + Mountpoint string `json:",omitempty"` + CreatedAt string `json:",omitempty"` + Status map[string]interface{} `json:",omitempty"` +} + +func newVolume(ctx context.Context, name string, volOpt VolOpts, drv *Driver) (*Volume, error) { + path := filepath.Join(drv.root, name) + mnt := &mountlib.MountPoint{ + MountPoint: path, + } + vol := &Volume{ + Name: name, + MountPoint: path, + CreatedAt: time.Now(), + drv: drv, + mnt: mnt, + mountReqs: make(map[string]interface{}), + } + err := vol.applyOptions(volOpt) + if err == nil { + err = vol.setup(ctx) + } + if err != nil { + return nil, err + } + return vol, nil +} + +// getInfo returns short digest about volume +func (vol *Volume) getInfo() *VolInfo { + vol.prepareState() + return &VolInfo{ + Name: vol.Name, + CreatedAt: vol.CreatedAt.Format(time.RFC3339), + Mountpoint: vol.MountPoint, + Status: rc.Params{"Mounts": vol.Mounts}, + } +} + +// prepareState prepares volume for saving state +func (vol *Volume) prepareState() { + vol.Mounts = []string{} + for id := range vol.mountReqs { + vol.Mounts = append(vol.Mounts, id) + } + sort.Strings(vol.Mounts) +} + +// restoreState updates volume from saved state +func (vol *Volume) restoreState(ctx context.Context, drv *Driver) error { + vol.drv = drv + vol.mnt = &mountlib.MountPoint{ + MountPoint: vol.MountPoint, + } + volOpt := vol.Options + volOpt["fs"] = vol.Fs + volOpt["type"] = vol.Type + if err := vol.applyOptions(volOpt); err != nil { + return err + } + if err := vol.validate(); err != nil { + return err + } + if err := vol.setup(ctx); err != nil { + return err + } + for _, id := range vol.Mounts { + if err := vol.mount(id); err != nil { + return err + } + } + return nil +} + +// validate volume +func (vol *Volume) validate() error { + if vol.Name == "" { + return errors.New("volume name is required") + } + if (vol.Type != "" && vol.Fs != "") || (vol.Type == "" && vol.Fs == "") { + return errors.New("volume must have either remote or backend type") + } + if vol.persist && vol.Type == "" { + return errors.New("backend type is required to persist remotes") + } + if vol.persist && !canPersist { + return errors.New("using backend type to persist remotes is prohibited") + } + if vol.MountPoint == "" { + return errors.New("mount point is required") + } + if vol.mountReqs == nil { + vol.mountReqs = make(map[string]interface{}) + } + return nil +} + +// checkMountpoint verifies that mount point is an existing empty directory +func (vol *Volume) checkMountpoint() error { + path := vol.mnt.MountPoint + if runtime.GOOS == "windows" { + path = filepath.Dir(path) + } + _, err := os.Lstat(path) + if os.IsNotExist(err) { + if err = os.MkdirAll(path, 0700); err != nil { + return errors.Wrapf(err, "failed to create mountpoint: %s", path) + } + } else if err != nil { + return err + } + if runtime.GOOS != "windows" { + if err := mountlib.CheckMountEmpty(path); err != nil { + return ErrMountpointExists + } + } + return nil +} + +// setup volume filesystem +func (vol *Volume) setup(ctx context.Context) error { + fs.Debugf(nil, "Setup volume %q as %q at path %s", vol.Name, vol.fsString, vol.MountPoint) + + if err := vol.checkMountpoint(); err != nil { + return err + } + if vol.drv.dummy { + return nil + } + + _, mountFn := mountlib.ResolveMountMethod(vol.mountType) + if mountFn == nil { + if vol.mountType != "" { + return errors.Errorf("unsupported mount type %q", vol.mountType) + } + return errors.New("mount command unsupported by this build") + } + vol.mnt.MountFn = mountFn + + if vol.persist { + // Add remote to config file + params := rc.Params{} + for key, val := range vol.Options { + params[key] = val + } + updateMode := config.UpdateRemoteOpt{} + _, err := config.CreateRemote(ctx, vol.Name, vol.Type, params, updateMode) + if err != nil { + return err + } + } + + // Use existing remote + f, err := fs.NewFs(ctx, vol.fsString) + if err == nil { + vol.mnt.Fs = f + } + return err +} + +// remove volume filesystem and mounts +func (vol *Volume) remove(ctx context.Context) error { + count := len(vol.mountReqs) + fs.Debugf(nil, "Remove volume %q (count %d)", vol.Name, count) + + if count > 0 { + return errors.New("volume is in use") + } + + if !vol.drv.dummy { + shutdownFn := vol.mnt.Fs.Features().Shutdown + if shutdownFn != nil { + if err := shutdownFn(ctx); err != nil { + return err + } + } + } + + if vol.persist { + // Remote remote from config file + config.DeleteRemote(vol.Name) + } + return nil +} + +// clearCache will clear VFS cache for the volume +func (vol *Volume) clearCache() error { + VFS := vol.mnt.VFS + if VFS == nil { + return nil + } + root, err := VFS.Root() + if err != nil { + return errors.Wrapf(err, "error reading root: %v", VFS.Fs()) + } + root.ForgetAll() + return nil +} + +// mount volume filesystem +func (vol *Volume) mount(id string) error { + drv := vol.drv + count := len(vol.mountReqs) + fs.Debugf(nil, "Mount volume %q for id %q at path %s (count %d)", + vol.Name, id, vol.MountPoint, count) + + if _, found := vol.mountReqs[id]; found { + return errors.New("volume is already mounted by this id") + } + + if count > 0 { // already mounted + vol.mountReqs[id] = nil + return nil + } + if drv.dummy { + vol.mountReqs[id] = nil + return nil + } + if vol.mnt.Fs == nil { + return errors.New("volume filesystem is not ready") + } + + if err := vol.mnt.Mount(); err != nil { + return err + } + vol.mnt.MountedOn = time.Now() + vol.mountReqs[id] = nil + vol.drv.monChan <- false // ask monitor to refresh channels + return nil +} + +// unmount volume +func (vol *Volume) unmount(id string) error { + count := len(vol.mountReqs) + fs.Debugf(nil, "Unmount volume %q from id %q at path %s (count %d)", + vol.Name, id, vol.MountPoint, count) + + if count == 0 { + return errors.New("volume is not mounted") + } + if _, found := vol.mountReqs[id]; !found { + return errors.New("volume is not mounted by this id") + } + + delete(vol.mountReqs, id) + if len(vol.mountReqs) > 0 { + return nil // more mounts left + } + + if vol.drv.dummy { + return nil + } + + mnt := vol.mnt + if mnt.UnmountFn != nil { + if err := mnt.UnmountFn(); err != nil { + return err + } + } + mnt.ErrChan = nil + mnt.UnmountFn = nil + mnt.VFS = nil + vol.drv.monChan <- false // ask monitor to refresh channels + return nil +} + +func (vol *Volume) unmountAll() error { + var firstErr error + for id := range vol.mountReqs { + err := vol.unmount(id) + if firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/cmd/serve/serve.go b/cmd/serve/serve.go index 07a550716..7bed14c7e 100644 --- a/cmd/serve/serve.go +++ b/cmd/serve/serve.go @@ -5,6 +5,7 @@ import ( "github.com/rclone/rclone/cmd" "github.com/rclone/rclone/cmd/serve/dlna" + "github.com/rclone/rclone/cmd/serve/docker" "github.com/rclone/rclone/cmd/serve/ftp" "github.com/rclone/rclone/cmd/serve/http" "github.com/rclone/rclone/cmd/serve/restic" @@ -30,6 +31,9 @@ func init() { if sftp.Command != nil { Command.AddCommand(sftp.Command) } + if docker.Command != nil { + Command.AddCommand(docker.Command) + } cmd.Root.AddCommand(Command) } diff --git a/go.mod b/go.mod index 39c6277c5..9e03c3c4d 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/calebcase/tmpfile v1.0.2 // indirect github.com/colinmarc/hdfs/v2 v2.2.0 github.com/coreos/go-semver v0.3.0 + github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e github.com/dop251/scsu v0.0.0-20200422003335-8fadfb689669 github.com/dropbox/dropbox-sdk-go-unofficial v1.0.1-0.20210114204226-41fdcdae8a53 github.com/gabriel-vasile/mimetype v1.2.0 diff --git a/go.sum b/go.sum index cd83a4556..10706db3e 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,10 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=