forked from TrueCloudLab/rclone
fs/rc: add more infrastructure to help writing rc functions
- Fs cache for rc commands - Helper functions for parsing the input - Reshape command for manipulating JSON blobs - Background Job starting, control, query and expiry
This commit is contained in:
parent
a379eec9d9
commit
2089405e1b
12 changed files with 1308 additions and 8 deletions
|
@ -86,6 +86,73 @@ $ rclone rc --json '{ "p1": [1,"2",null,4], "p2": { "a":1, "b":2 } }' rc/noop
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Special parameters
|
||||||
|
|
||||||
|
The rc interface supports some special parameters which apply to
|
||||||
|
**all** commands. These start with `_` to show they are different.
|
||||||
|
|
||||||
|
### Running asynchronous jobs with _async = true
|
||||||
|
|
||||||
|
If `_async` has a true value when supplied to an rc call then it will
|
||||||
|
return immediately with a job id and the task will be run in the
|
||||||
|
background. The `job/status` call can be used to get information of
|
||||||
|
the background job. The job can be queried for up to 1 minute after
|
||||||
|
it has finished.
|
||||||
|
|
||||||
|
It is recommended that potentially long running jobs, eg `sync/sync`,
|
||||||
|
`sync/copy`, `sync/move`, `operations/purge` are run with the `_async`
|
||||||
|
flag to avoid any potential problems with the HTTP request and
|
||||||
|
response timing out.
|
||||||
|
|
||||||
|
Starting a job with the `_async` flag:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone rc --json '{ "p1": [1,"2",null,4], "p2": { "a":1, "b":2 }, "_async": true }' rc/noop
|
||||||
|
{
|
||||||
|
"jobid": 2
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Query the status to see if the job has finished. For more information
|
||||||
|
on the meaning of these return parameters see the `job/status` call.
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone rc --json '{ "jobid":2 }' job/status
|
||||||
|
{
|
||||||
|
"duration": 0.000124163,
|
||||||
|
"endTime": "2018-10-27T11:38:07.911245881+01:00",
|
||||||
|
"error": "",
|
||||||
|
"finished": true,
|
||||||
|
"id": 2,
|
||||||
|
"output": {
|
||||||
|
"_async": true,
|
||||||
|
"p1": [
|
||||||
|
1,
|
||||||
|
"2",
|
||||||
|
null,
|
||||||
|
4
|
||||||
|
],
|
||||||
|
"p2": {
|
||||||
|
"a": 1,
|
||||||
|
"b": 2
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"startTime": "2018-10-27T11:38:07.911121728+01:00",
|
||||||
|
"success": true
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`job/list` can be used to show the running or recently completed jobs
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone rc job/list
|
||||||
|
{
|
||||||
|
"jobids": [
|
||||||
|
2
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## Supported commands
|
## Supported commands
|
||||||
<!--- autogenerated start - run make rcdocs - don't edit here -->
|
<!--- autogenerated start - run make rcdocs - don't edit here -->
|
||||||
### cache/expire: Purge a remote from cache
|
### cache/expire: Purge a remote from cache
|
||||||
|
|
60
fs/rc/cache.go
Normal file
60
fs/rc/cache.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
// This implements the Fs cache
|
||||||
|
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fsCacheMu sync.Mutex
|
||||||
|
fsCache = map[string]fs.Fs{}
|
||||||
|
fsNewFs = fs.NewFs // for tests
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh
|
||||||
|
func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
defer fsCacheMu.Unlock()
|
||||||
|
|
||||||
|
fsString, err := in.GetString(fsName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f = fsCache[fsString]
|
||||||
|
if f == nil {
|
||||||
|
f, err = fsNewFs(fsString)
|
||||||
|
if err == nil {
|
||||||
|
fsCache[fsString] = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return f, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh
|
||||||
|
func GetFs(in Params) (f fs.Fs, err error) {
|
||||||
|
return GetFsNamed(in, "fs")
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFsAndRemoteNamed gets the fsName parameter from in, makes a
|
||||||
|
// remote or fetches it from the cache then gets the remoteName
|
||||||
|
// parameter from in too.
|
||||||
|
func GetFsAndRemoteNamed(in Params, fsName, remoteName string) (f fs.Fs, remote string, err error) {
|
||||||
|
remote, err = in.GetString(remoteName)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f, err = GetFsNamed(in, fsName)
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFsAndRemote gets the `fs` parameter from in, makes a remote or
|
||||||
|
// fetches it from the cache then gets the `remote` parameter from in
|
||||||
|
// too.
|
||||||
|
func GetFsAndRemote(in Params) (f fs.Fs, remote string, err error) {
|
||||||
|
return GetFsAndRemoteNamed(in, "fs", "remote")
|
||||||
|
}
|
107
fs/rc/cache_test.go
Normal file
107
fs/rc/cache_test.go
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fstest/mockfs"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var called = 0
|
||||||
|
|
||||||
|
func mockNewFs(t *testing.T) func() {
|
||||||
|
called = 0
|
||||||
|
oldFsNewFs := fsNewFs
|
||||||
|
fsNewFs = func(path string) (fs.Fs, error) {
|
||||||
|
assert.Equal(t, 0, called)
|
||||||
|
called++
|
||||||
|
assert.Equal(t, "/", path)
|
||||||
|
return mockfs.NewFs("mock", "mock"), nil
|
||||||
|
}
|
||||||
|
return func() {
|
||||||
|
fsNewFs = oldFsNewFs
|
||||||
|
fsCache = map[string]fs.Fs{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetCachedFs(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
f, err := GetCachedFs("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
|
||||||
|
f2, err := GetCachedFs("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, f, f2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFsNamed(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
in := Params{
|
||||||
|
"potato": "/",
|
||||||
|
}
|
||||||
|
f, err := GetFsNamed(in, "potato")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, f)
|
||||||
|
|
||||||
|
in = Params{
|
||||||
|
"sausage": "/",
|
||||||
|
}
|
||||||
|
f, err = GetFsNamed(in, "potato")
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Nil(t, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFs(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
in := Params{
|
||||||
|
"fs": "/",
|
||||||
|
}
|
||||||
|
f, err := GetFs(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFsAndRemoteNamed(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
in := Params{
|
||||||
|
"fs": "/",
|
||||||
|
"remote": "hello",
|
||||||
|
}
|
||||||
|
f, remote, err := GetFsAndRemoteNamed(in, "fs", "remote")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, f)
|
||||||
|
assert.Equal(t, "hello", remote)
|
||||||
|
|
||||||
|
f, remote, err = GetFsAndRemoteNamed(in, "fsX", "remote")
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Nil(t, f)
|
||||||
|
|
||||||
|
f, remote, err = GetFsAndRemoteNamed(in, "fs", "remoteX")
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Nil(t, f)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFsAndRemote(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
in := Params{
|
||||||
|
"fs": "/",
|
||||||
|
"remote": "hello",
|
||||||
|
}
|
||||||
|
f, remote, err := GetFsAndRemote(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, f)
|
||||||
|
assert.Equal(t, "hello", remote)
|
||||||
|
}
|
88
fs/rc/config_test.go
Normal file
88
fs/rc/config_test.go
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func clearOptionBlock() {
|
||||||
|
optionBlock = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var testOptions = struct {
|
||||||
|
String string
|
||||||
|
Int int
|
||||||
|
}{
|
||||||
|
String: "hello",
|
||||||
|
Int: 42,
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddOption(t *testing.T) {
|
||||||
|
defer clearOptionBlock()
|
||||||
|
assert.Equal(t, len(optionBlock), 0)
|
||||||
|
AddOption("potato", &testOptions)
|
||||||
|
assert.Equal(t, len(optionBlock), 1)
|
||||||
|
assert.Equal(t, &testOptions, optionBlock["potato"])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOptionsBlocks(t *testing.T) {
|
||||||
|
defer clearOptionBlock()
|
||||||
|
AddOption("potato", &testOptions)
|
||||||
|
call := Calls.Get("options/blocks")
|
||||||
|
require.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, Params{"options": []string{"potato"}}, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOptionsGet(t *testing.T) {
|
||||||
|
defer clearOptionBlock()
|
||||||
|
AddOption("potato", &testOptions)
|
||||||
|
call := Calls.Get("options/get")
|
||||||
|
require.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, Params{"potato": &testOptions}, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOptionsSet(t *testing.T) {
|
||||||
|
defer clearOptionBlock()
|
||||||
|
AddOption("potato", &testOptions)
|
||||||
|
call := Calls.Get("options/set")
|
||||||
|
require.NotNil(t, call)
|
||||||
|
|
||||||
|
in := Params{
|
||||||
|
"potato": Params{
|
||||||
|
"Int": 50,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, out)
|
||||||
|
assert.Equal(t, 50, testOptions.Int)
|
||||||
|
assert.Equal(t, "hello", testOptions.String)
|
||||||
|
|
||||||
|
// unknown option block
|
||||||
|
in = Params{
|
||||||
|
"sausage": Params{
|
||||||
|
"Int": 50,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
out, err = call.Fn(in)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "unknown option block")
|
||||||
|
|
||||||
|
// bad shape
|
||||||
|
in = Params{
|
||||||
|
"potato": []string{"a", "b"},
|
||||||
|
}
|
||||||
|
out, err = call.Fn(in)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "failed to write options")
|
||||||
|
}
|
76
fs/rc/internal_test.go
Normal file
76
fs/rc/internal_test.go
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInternalNoop(t *testing.T) {
|
||||||
|
call := Calls.Get("rc/noop")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{
|
||||||
|
"String": "hello",
|
||||||
|
"Int": 42,
|
||||||
|
}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, in, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInternalError(t *testing.T) {
|
||||||
|
call := Calls.Get("rc/error")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInternalList(t *testing.T) {
|
||||||
|
call := Calls.Get("rc/list")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, Params{"commands": Calls.List()}, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCorePid(t *testing.T) {
|
||||||
|
call := Calls.Get("core/pid")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
pid := out["pid"]
|
||||||
|
assert.NotEqual(t, nil, pid)
|
||||||
|
_, ok := pid.(int)
|
||||||
|
assert.Equal(t, true, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoreMemstats(t *testing.T) {
|
||||||
|
call := Calls.Get("core/memstats")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
sys := out["Sys"]
|
||||||
|
assert.NotEqual(t, nil, sys)
|
||||||
|
_, ok := sys.(uint64)
|
||||||
|
assert.Equal(t, true, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoreGC(t *testing.T) {
|
||||||
|
call := Calls.Get("core/gc")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, out)
|
||||||
|
assert.Equal(t, Params(nil), out)
|
||||||
|
}
|
215
fs/rc/job.go
Normal file
215
fs/rc/job.go
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
// Manage background jobs that the rc is running
|
||||||
|
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// expire the job when it is finished and older than this
|
||||||
|
expireDuration = 60 * time.Second
|
||||||
|
// inteval to run the expire cache
|
||||||
|
expireInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Job describes a asynchronous task started via the rc package
|
||||||
|
type Job struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
StartTime time.Time `json:"startTime"`
|
||||||
|
EndTime time.Time `json:"endTime"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
Finished bool `json:"finished"`
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Duration float64 `json:"duration"`
|
||||||
|
Output Params `json:"output"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Jobs describes a collection of running tasks
|
||||||
|
type Jobs struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
jobs map[int64]*Job
|
||||||
|
expireInterval time.Duration
|
||||||
|
expireRunning bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
running = newJobs()
|
||||||
|
jobID = int64(0)
|
||||||
|
)
|
||||||
|
|
||||||
|
// newJobs makes a new Jobs structure
|
||||||
|
func newJobs() *Jobs {
|
||||||
|
return &Jobs{
|
||||||
|
jobs: map[int64]*Job{},
|
||||||
|
expireInterval: expireInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// kickExpire makes sure Expire is running
|
||||||
|
func (jobs *Jobs) kickExpire() {
|
||||||
|
jobs.mu.Lock()
|
||||||
|
defer jobs.mu.Unlock()
|
||||||
|
if !jobs.expireRunning {
|
||||||
|
time.AfterFunc(jobs.expireInterval, jobs.Expire)
|
||||||
|
jobs.expireRunning = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expire expires any jobs that haven't been collected
|
||||||
|
func (jobs *Jobs) Expire() {
|
||||||
|
jobs.mu.Lock()
|
||||||
|
defer jobs.mu.Unlock()
|
||||||
|
now := time.Now()
|
||||||
|
for ID, job := range jobs.jobs {
|
||||||
|
job.mu.Lock()
|
||||||
|
if job.Finished && now.Sub(job.EndTime) > expireDuration {
|
||||||
|
delete(jobs.jobs, ID)
|
||||||
|
}
|
||||||
|
job.mu.Unlock()
|
||||||
|
}
|
||||||
|
if len(jobs.jobs) != 0 {
|
||||||
|
time.AfterFunc(jobs.expireInterval, jobs.Expire)
|
||||||
|
jobs.expireRunning = true
|
||||||
|
} else {
|
||||||
|
jobs.expireRunning = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IDs returns the IDs of the running jobs
|
||||||
|
func (jobs *Jobs) IDs() (IDs []int64) {
|
||||||
|
jobs.mu.RLock()
|
||||||
|
defer jobs.mu.RUnlock()
|
||||||
|
IDs = []int64{}
|
||||||
|
for ID := range jobs.jobs {
|
||||||
|
IDs = append(IDs, ID)
|
||||||
|
}
|
||||||
|
return IDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a job with a given ID or nil if it doesn't exist
|
||||||
|
func (jobs *Jobs) Get(ID int64) *Job {
|
||||||
|
jobs.mu.RLock()
|
||||||
|
defer jobs.mu.RUnlock()
|
||||||
|
return jobs.jobs[ID]
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark the job as finished
|
||||||
|
func (job *Job) finish(out Params, err error) {
|
||||||
|
job.mu.Lock()
|
||||||
|
job.EndTime = time.Now()
|
||||||
|
if out == nil {
|
||||||
|
out = make(Params)
|
||||||
|
}
|
||||||
|
job.Output = out
|
||||||
|
job.Duration = job.EndTime.Sub(job.StartTime).Seconds()
|
||||||
|
if err != nil {
|
||||||
|
job.Error = err.Error()
|
||||||
|
job.Success = false
|
||||||
|
} else {
|
||||||
|
job.Error = ""
|
||||||
|
job.Success = true
|
||||||
|
}
|
||||||
|
job.Finished = true
|
||||||
|
job.mu.Unlock()
|
||||||
|
running.kickExpire() // make sure this job gets expired
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the job until completion writing the return status
|
||||||
|
func (job *Job) run(fn Func, in Params) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
job.finish(nil, errors.Errorf("panic received: %v", r))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
job.finish(fn(in))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJob start a new Job off
|
||||||
|
func (jobs *Jobs) NewJob(fn Func, in Params) *Job {
|
||||||
|
job := &Job{
|
||||||
|
ID: atomic.AddInt64(&jobID, 1),
|
||||||
|
StartTime: time.Now(),
|
||||||
|
}
|
||||||
|
go job.run(fn, in)
|
||||||
|
jobs.mu.Lock()
|
||||||
|
jobs.jobs[job.ID] = job
|
||||||
|
jobs.mu.Unlock()
|
||||||
|
return job
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartJob starts a new job and returns a Param suitable for output
|
||||||
|
func StartJob(fn Func, in Params) (Params, error) {
|
||||||
|
job := running.NewJob(fn, in)
|
||||||
|
out := make(Params)
|
||||||
|
out["jobid"] = job.ID
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Add(Call{
|
||||||
|
Path: "job/status",
|
||||||
|
Fn: rcJobStatus,
|
||||||
|
Title: "Reads the status of the job ID",
|
||||||
|
Help: `Parameters
|
||||||
|
- jobid - id of the job (integer)
|
||||||
|
|
||||||
|
Results
|
||||||
|
- finished - boolean
|
||||||
|
- duration - time in seconds that the job ran for
|
||||||
|
- endTime - time the job finished (eg "2018-10-26T18:50:20.528746884+01:00")
|
||||||
|
- error - error from the job or empty string for no error
|
||||||
|
- finished - boolean whether the job has finished or not
|
||||||
|
- id - as passed in above
|
||||||
|
- startTime - time the job started (eg "2018-10-26T18:50:20.528336039+01:00")
|
||||||
|
- success - boolean - true for success false otherwise
|
||||||
|
- output - output of the job as would have been returned if called synchronously
|
||||||
|
`,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the status of a job
|
||||||
|
func rcJobStatus(in Params) (out Params, err error) {
|
||||||
|
jobID, err := in.GetInt64("jobid")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
job := running.Get(jobID)
|
||||||
|
if job == nil {
|
||||||
|
return nil, errors.New("job not found")
|
||||||
|
}
|
||||||
|
job.mu.Lock()
|
||||||
|
defer job.mu.Unlock()
|
||||||
|
out = make(Params)
|
||||||
|
err = Reshape(&out, job)
|
||||||
|
if job == nil {
|
||||||
|
return nil, errors.New("Reshape failed in job status")
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Add(Call{
|
||||||
|
Path: "job/list",
|
||||||
|
Fn: rcJobList,
|
||||||
|
Title: "Lists the IDs of the running jobs",
|
||||||
|
Help: `Parameters - None
|
||||||
|
|
||||||
|
Results
|
||||||
|
- jobids - array of integer job ids
|
||||||
|
`,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the status of a job
|
||||||
|
func rcJobList(in Params) (out Params, err error) {
|
||||||
|
out = make(Params)
|
||||||
|
out["jobids"] = running.IDs()
|
||||||
|
return out, nil
|
||||||
|
}
|
194
fs/rc/job_test.go
Normal file
194
fs/rc/job_test.go
Normal file
|
@ -0,0 +1,194 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewJobs(t *testing.T) {
|
||||||
|
jobs := newJobs()
|
||||||
|
assert.Equal(t, 0, len(jobs.jobs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobsKickExpire(t *testing.T) {
|
||||||
|
jobs := newJobs()
|
||||||
|
jobs.expireInterval = time.Millisecond
|
||||||
|
assert.Equal(t, false, jobs.expireRunning)
|
||||||
|
jobs.kickExpire()
|
||||||
|
assert.Equal(t, true, jobs.expireRunning)
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
assert.Equal(t, false, jobs.expireRunning)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobsExpire(t *testing.T) {
|
||||||
|
wait := make(chan struct{})
|
||||||
|
jobs := newJobs()
|
||||||
|
jobs.expireInterval = time.Millisecond
|
||||||
|
assert.Equal(t, false, jobs.expireRunning)
|
||||||
|
job := jobs.NewJob(func(in Params) (Params, error) {
|
||||||
|
defer close(wait)
|
||||||
|
return in, nil
|
||||||
|
}, Params{})
|
||||||
|
<-wait
|
||||||
|
assert.Equal(t, 1, len(jobs.jobs))
|
||||||
|
jobs.Expire()
|
||||||
|
assert.Equal(t, 1, len(jobs.jobs))
|
||||||
|
job.EndTime = time.Now().Add(-expireDuration - 60*time.Second)
|
||||||
|
assert.Equal(t, true, jobs.expireRunning)
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
assert.Equal(t, false, jobs.expireRunning)
|
||||||
|
assert.Equal(t, 0, len(jobs.jobs))
|
||||||
|
}
|
||||||
|
|
||||||
|
var noopFn = func(in Params) (Params, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobsIDs(t *testing.T) {
|
||||||
|
jobs := newJobs()
|
||||||
|
job1 := jobs.NewJob(noopFn, Params{})
|
||||||
|
job2 := jobs.NewJob(noopFn, Params{})
|
||||||
|
wantIDs := []int64{job1.ID, job2.ID}
|
||||||
|
gotIDs := jobs.IDs()
|
||||||
|
require.Equal(t, 2, len(gotIDs))
|
||||||
|
if gotIDs[0] != wantIDs[0] {
|
||||||
|
gotIDs[0], gotIDs[1] = gotIDs[1], gotIDs[0]
|
||||||
|
}
|
||||||
|
assert.Equal(t, wantIDs, gotIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobsGet(t *testing.T) {
|
||||||
|
jobs := newJobs()
|
||||||
|
job := jobs.NewJob(noopFn, Params{})
|
||||||
|
assert.Equal(t, job, jobs.Get(job.ID))
|
||||||
|
assert.Nil(t, jobs.Get(123123123123))
|
||||||
|
}
|
||||||
|
|
||||||
|
var longFn = func(in Params) (Params, error) {
|
||||||
|
time.Sleep(1 * time.Hour)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFinish(t *testing.T) {
|
||||||
|
jobs := newJobs()
|
||||||
|
job := jobs.NewJob(longFn, Params{})
|
||||||
|
|
||||||
|
assert.Equal(t, true, job.EndTime.IsZero())
|
||||||
|
assert.Equal(t, Params(nil), job.Output)
|
||||||
|
assert.Equal(t, 0.0, job.Duration)
|
||||||
|
assert.Equal(t, "", job.Error)
|
||||||
|
assert.Equal(t, false, job.Success)
|
||||||
|
assert.Equal(t, false, job.Finished)
|
||||||
|
|
||||||
|
wantOut := Params{"a": 1}
|
||||||
|
job.finish(wantOut, nil)
|
||||||
|
|
||||||
|
assert.Equal(t, false, job.EndTime.IsZero())
|
||||||
|
assert.Equal(t, wantOut, job.Output)
|
||||||
|
assert.NotEqual(t, 0.0, job.Duration)
|
||||||
|
assert.Equal(t, "", job.Error)
|
||||||
|
assert.Equal(t, true, job.Success)
|
||||||
|
assert.Equal(t, true, job.Finished)
|
||||||
|
|
||||||
|
job = jobs.NewJob(longFn, Params{})
|
||||||
|
job.finish(nil, nil)
|
||||||
|
|
||||||
|
assert.Equal(t, false, job.EndTime.IsZero())
|
||||||
|
assert.Equal(t, Params{}, job.Output)
|
||||||
|
assert.NotEqual(t, 0.0, job.Duration)
|
||||||
|
assert.Equal(t, "", job.Error)
|
||||||
|
assert.Equal(t, true, job.Success)
|
||||||
|
assert.Equal(t, true, job.Finished)
|
||||||
|
|
||||||
|
job = jobs.NewJob(longFn, Params{})
|
||||||
|
job.finish(wantOut, errors.New("potato"))
|
||||||
|
|
||||||
|
assert.Equal(t, false, job.EndTime.IsZero())
|
||||||
|
assert.Equal(t, wantOut, job.Output)
|
||||||
|
assert.NotEqual(t, 0.0, job.Duration)
|
||||||
|
assert.Equal(t, "potato", job.Error)
|
||||||
|
assert.Equal(t, false, job.Success)
|
||||||
|
assert.Equal(t, true, job.Finished)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We've tested the functionality of run() already as it is
|
||||||
|
// part of NewJob, now just test the panic catching
|
||||||
|
func TestJobRunPanic(t *testing.T) {
|
||||||
|
wait := make(chan struct{})
|
||||||
|
boom := func(in Params) (Params, error) {
|
||||||
|
defer close(wait)
|
||||||
|
panic("boom")
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs := newJobs()
|
||||||
|
job := jobs.NewJob(boom, Params{})
|
||||||
|
<-wait
|
||||||
|
|
||||||
|
assert.Equal(t, false, job.EndTime.IsZero())
|
||||||
|
assert.Equal(t, Params{}, job.Output)
|
||||||
|
assert.NotEqual(t, 0.0, job.Duration)
|
||||||
|
assert.Equal(t, "panic received: boom", job.Error)
|
||||||
|
assert.Equal(t, false, job.Success)
|
||||||
|
assert.Equal(t, true, job.Finished)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobsNewJob(t *testing.T) {
|
||||||
|
jobID = 0
|
||||||
|
jobs := newJobs()
|
||||||
|
job := jobs.NewJob(noopFn, Params{})
|
||||||
|
assert.Equal(t, int64(1), job.ID)
|
||||||
|
assert.Equal(t, job, jobs.Get(1))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStartJob(t *testing.T) {
|
||||||
|
jobID = 0
|
||||||
|
out, err := StartJob(longFn, Params{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, Params{"jobid": int64(1)}, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRcJobStatus(t *testing.T) {
|
||||||
|
jobID = 0
|
||||||
|
_, err := StartJob(longFn, Params{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
call := Calls.Get("job/status")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{"jobid": 1}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, float64(1), out["id"])
|
||||||
|
assert.Equal(t, "", out["error"])
|
||||||
|
assert.Equal(t, false, out["finished"])
|
||||||
|
assert.Equal(t, false, out["success"])
|
||||||
|
|
||||||
|
in = Params{"jobid": 123123123}
|
||||||
|
_, err = call.Fn(in)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "job not found")
|
||||||
|
|
||||||
|
in = Params{"jobidx": 123123123}
|
||||||
|
_, err = call.Fn(in)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "Didn't find key")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRcJobList(t *testing.T) {
|
||||||
|
jobID = 0
|
||||||
|
_, err := StartJob(longFn, Params{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
call := Calls.Get("job/list")
|
||||||
|
assert.NotNil(t, call)
|
||||||
|
in := Params{}
|
||||||
|
out, err := call.Fn(in)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, out)
|
||||||
|
assert.Equal(t, Params{"jobids": []int64{1}}, out)
|
||||||
|
}
|
204
fs/rc/params.go
Normal file
204
fs/rc/params.go
Normal file
|
@ -0,0 +1,204 @@
|
||||||
|
// Parameter parsing
|
||||||
|
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Params is the input and output type for the Func
|
||||||
|
type Params map[string]interface{}
|
||||||
|
|
||||||
|
// ErrParamNotFound - this is returned from the Get* functions if the
|
||||||
|
// parameter isn't found along with a zero value of the requested
|
||||||
|
// item.
|
||||||
|
//
|
||||||
|
// Returning an error of this type from an rc.Func will cause the http
|
||||||
|
// method to return http.StatusBadRequest
|
||||||
|
type ErrParamNotFound string
|
||||||
|
|
||||||
|
// Error turns this error into a string
|
||||||
|
func (e ErrParamNotFound) Error() string {
|
||||||
|
return fmt.Sprintf("Didn't find key %q in input", string(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsErrParamNotFound returns whether err is ErrParamNotFound
|
||||||
|
func IsErrParamNotFound(err error) bool {
|
||||||
|
_, isNotFound := err.(ErrParamNotFound)
|
||||||
|
return isNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotErrParamNotFound returns true if err != nil and
|
||||||
|
// !IsErrParamNotFound(err)
|
||||||
|
//
|
||||||
|
// This is for checking error returns of the Get* functions to ignore
|
||||||
|
// error not found returns and take the default value.
|
||||||
|
func NotErrParamNotFound(err error) bool {
|
||||||
|
return err != nil && !IsErrParamNotFound(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrParamInvalid - this is returned from the Get* functions if the
|
||||||
|
// parameter is invalid.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// Returning an error of this type from an rc.Func will cause the http
|
||||||
|
// method to return http.StatusBadRequest
|
||||||
|
type ErrParamInvalid struct {
|
||||||
|
error
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsErrParamInvalid returns whether err is ErrParamInvalid
|
||||||
|
func IsErrParamInvalid(err error) bool {
|
||||||
|
_, isInvalid := err.(ErrParamInvalid)
|
||||||
|
return isInvalid
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reshape reshapes one blob of data into another via json serialization
|
||||||
|
//
|
||||||
|
// out should be a pointer type
|
||||||
|
//
|
||||||
|
// This isn't a very efficient way of dealing with this!
|
||||||
|
func Reshape(out interface{}, in interface{}) error {
|
||||||
|
b, err := json.Marshal(in)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Reshape failed to Marshal")
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(b, out)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Reshape failed to Unmarshal")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets a parameter from the input
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and the returned value will be nil.
|
||||||
|
func (p Params) Get(key string) (interface{}, error) {
|
||||||
|
value, ok := p[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrParamNotFound(key)
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetString gets a string parameter from the input
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and the returned value will be "".
|
||||||
|
func (p Params) GetString(key string) (string, error) {
|
||||||
|
value, err := p.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
str, ok := value.(string)
|
||||||
|
if !ok {
|
||||||
|
return "", ErrParamInvalid{errors.Errorf("expecting string value for key %q (was %T)", key, value)}
|
||||||
|
}
|
||||||
|
return str, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetInt64 gets a int64 parameter from the input
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and the returned value will be 0.
|
||||||
|
func (p Params) GetInt64(key string) (int64, error) {
|
||||||
|
value, err := p.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
switch x := value.(type) {
|
||||||
|
case int:
|
||||||
|
return int64(x), nil
|
||||||
|
case int64:
|
||||||
|
return x, nil
|
||||||
|
case float64:
|
||||||
|
if x > math.MaxInt64 || x < math.MinInt64 {
|
||||||
|
return 0, ErrParamInvalid{errors.Errorf("key %q (%v) overflows int64 ", key, value)}
|
||||||
|
}
|
||||||
|
return int64(x), nil
|
||||||
|
case string:
|
||||||
|
i, err := strconv.ParseInt(x, 10, 0)
|
||||||
|
if err != nil {
|
||||||
|
return 0, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as int64", key, value)}
|
||||||
|
}
|
||||||
|
return i, nil
|
||||||
|
}
|
||||||
|
return 0, ErrParamInvalid{errors.Errorf("expecting int64 value for key %q (was %T)", key, value)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFloat64 gets a float64 parameter from the input
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and the returned value will be 0.
|
||||||
|
func (p Params) GetFloat64(key string) (float64, error) {
|
||||||
|
value, err := p.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
switch x := value.(type) {
|
||||||
|
case float64:
|
||||||
|
return x, nil
|
||||||
|
case int:
|
||||||
|
return float64(x), nil
|
||||||
|
case int64:
|
||||||
|
return float64(x), nil
|
||||||
|
case string:
|
||||||
|
f, err := strconv.ParseFloat(x, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as float64", key, value)}
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
return 0, ErrParamInvalid{errors.Errorf("expecting float64 value for key %q (was %T)", key, value)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBool gets a boolean parameter from the input
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and the returned value will be false.
|
||||||
|
func (p Params) GetBool(key string) (bool, error) {
|
||||||
|
value, err := p.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
switch x := value.(type) {
|
||||||
|
case int:
|
||||||
|
return x != 0, nil
|
||||||
|
case int64:
|
||||||
|
return x != 0, nil
|
||||||
|
case float64:
|
||||||
|
return x != 0, nil
|
||||||
|
case bool:
|
||||||
|
return x, nil
|
||||||
|
case string:
|
||||||
|
b, err := strconv.ParseBool(x)
|
||||||
|
if err != nil {
|
||||||
|
return false, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as bool", key, value)}
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
return false, ErrParamInvalid{errors.Errorf("expecting bool value for key %q (was %T)", key, value)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStruct gets a struct from key from the input into the struct
|
||||||
|
// pointed to by out. out must be a pointer type.
|
||||||
|
//
|
||||||
|
// If the parameter isn't found then error will be of type
|
||||||
|
// ErrParamNotFound and out will be unchanged.
|
||||||
|
func (p Params) GetStruct(key string, out interface{}) error {
|
||||||
|
value, err := p.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = Reshape(out, value)
|
||||||
|
if err != nil {
|
||||||
|
return ErrParamInvalid{errors.Wrapf(err, "key %q", key)}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
251
fs/rc/params_test.go
Normal file
251
fs/rc/params_test.go
Normal file
|
@ -0,0 +1,251 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestErrParamNotFoundError(t *testing.T) {
|
||||||
|
e := ErrParamNotFound("key")
|
||||||
|
assert.Equal(t, "Didn't find key \"key\" in input", e.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsErrParamNotFound(t *testing.T) {
|
||||||
|
assert.Equal(t, true, IsErrParamNotFound(ErrParamNotFound("key")))
|
||||||
|
assert.Equal(t, false, IsErrParamNotFound(nil))
|
||||||
|
assert.Equal(t, false, IsErrParamNotFound(errors.New("potato")))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNotErrParamNotFound(t *testing.T) {
|
||||||
|
assert.Equal(t, false, NotErrParamNotFound(ErrParamNotFound("key")))
|
||||||
|
assert.Equal(t, false, NotErrParamNotFound(nil))
|
||||||
|
assert.Equal(t, true, NotErrParamNotFound(errors.New("potato")))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsErrParamInvalid(t *testing.T) {
|
||||||
|
e := ErrParamInvalid{errors.New("potato")}
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e))
|
||||||
|
assert.Equal(t, false, IsErrParamInvalid(nil))
|
||||||
|
assert.Equal(t, false, IsErrParamInvalid(errors.New("potato")))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReshape(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"String": "hello",
|
||||||
|
"Float": 4.2,
|
||||||
|
}
|
||||||
|
var out struct {
|
||||||
|
String string
|
||||||
|
Float float64
|
||||||
|
}
|
||||||
|
require.NoError(t, Reshape(&out, in))
|
||||||
|
assert.Equal(t, "hello", out.String)
|
||||||
|
assert.Equal(t, 4.2, out.Float)
|
||||||
|
var inCopy = Params{}
|
||||||
|
require.NoError(t, Reshape(&inCopy, out))
|
||||||
|
assert.Equal(t, in, inCopy)
|
||||||
|
|
||||||
|
// Now a failure to marshal
|
||||||
|
var in2 func()
|
||||||
|
require.Error(t, Reshape(&inCopy, in2))
|
||||||
|
|
||||||
|
// Now a failure to unmarshal
|
||||||
|
require.Error(t, Reshape(&out, "string"))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGet(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"ok": 1,
|
||||||
|
}
|
||||||
|
v1, e1 := in.Get("ok")
|
||||||
|
assert.NoError(t, e1)
|
||||||
|
assert.Equal(t, 1, v1)
|
||||||
|
v2, e2 := in.Get("notOK")
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, nil, v2)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGetString(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"string": "one",
|
||||||
|
"notString": 17,
|
||||||
|
}
|
||||||
|
v1, e1 := in.GetString("string")
|
||||||
|
assert.NoError(t, e1)
|
||||||
|
assert.Equal(t, "one", v1)
|
||||||
|
v2, e2 := in.GetString("notOK")
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, "", v2)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
v3, e3 := in.GetString("notString")
|
||||||
|
assert.Error(t, e3)
|
||||||
|
assert.Equal(t, "", v3)
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGetInt64(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
value interface{}
|
||||||
|
result int64
|
||||||
|
errString string
|
||||||
|
}{
|
||||||
|
{"123", 123, ""},
|
||||||
|
{"123x", 0, "couldn't parse"},
|
||||||
|
{int(12), 12, ""},
|
||||||
|
{int64(13), 13, ""},
|
||||||
|
{float64(14), 14, ""},
|
||||||
|
{float64(9.3E18), 0, "overflows int64"},
|
||||||
|
{float64(-9.3E18), 0, "overflows int64"},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"key": test.value,
|
||||||
|
}
|
||||||
|
v1, e1 := in.GetInt64("key")
|
||||||
|
if test.errString == "" {
|
||||||
|
require.NoError(t, e1)
|
||||||
|
assert.Equal(t, test.result, v1)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, e1)
|
||||||
|
require.Error(t, e1)
|
||||||
|
assert.Contains(t, e1.Error(), test.errString)
|
||||||
|
assert.Equal(t, int64(0), v1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
in := Params{
|
||||||
|
"notInt64": []string{"a", "b"},
|
||||||
|
}
|
||||||
|
v2, e2 := in.GetInt64("notOK")
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, int64(0), v2)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
v3, e3 := in.GetInt64("notInt64")
|
||||||
|
assert.Error(t, e3)
|
||||||
|
assert.Equal(t, int64(0), v3)
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGetFloat64(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
value interface{}
|
||||||
|
result float64
|
||||||
|
errString string
|
||||||
|
}{
|
||||||
|
{"123.1", 123.1, ""},
|
||||||
|
{"123x1", 0, "couldn't parse"},
|
||||||
|
{int(12), 12, ""},
|
||||||
|
{int64(13), 13, ""},
|
||||||
|
{float64(14), 14, ""},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"key": test.value,
|
||||||
|
}
|
||||||
|
v1, e1 := in.GetFloat64("key")
|
||||||
|
if test.errString == "" {
|
||||||
|
require.NoError(t, e1)
|
||||||
|
assert.Equal(t, test.result, v1)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, e1)
|
||||||
|
require.Error(t, e1)
|
||||||
|
assert.Contains(t, e1.Error(), test.errString)
|
||||||
|
assert.Equal(t, float64(0), v1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
in := Params{
|
||||||
|
"notFloat64": []string{"a", "b"},
|
||||||
|
}
|
||||||
|
v2, e2 := in.GetFloat64("notOK")
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, float64(0), v2)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
v3, e3 := in.GetFloat64("notFloat64")
|
||||||
|
assert.Error(t, e3)
|
||||||
|
assert.Equal(t, float64(0), v3)
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGetBool(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
value interface{}
|
||||||
|
result bool
|
||||||
|
errString string
|
||||||
|
}{
|
||||||
|
{true, true, ""},
|
||||||
|
{false, false, ""},
|
||||||
|
{"true", true, ""},
|
||||||
|
{"false", false, ""},
|
||||||
|
{"fasle", false, "couldn't parse"},
|
||||||
|
{int(12), true, ""},
|
||||||
|
{int(0), false, ""},
|
||||||
|
{int64(13), true, ""},
|
||||||
|
{int64(0), false, ""},
|
||||||
|
{float64(14), true, ""},
|
||||||
|
{float64(0), false, ""},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"key": test.value,
|
||||||
|
}
|
||||||
|
v1, e1 := in.GetBool("key")
|
||||||
|
if test.errString == "" {
|
||||||
|
require.NoError(t, e1)
|
||||||
|
assert.Equal(t, test.result, v1)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, e1)
|
||||||
|
require.Error(t, e1)
|
||||||
|
assert.Contains(t, e1.Error(), test.errString)
|
||||||
|
assert.Equal(t, false, v1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
in := Params{
|
||||||
|
"notBool": []string{"a", "b"},
|
||||||
|
}
|
||||||
|
v2, e2 := Params{}.GetBool("notOK")
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, false, v2)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
v3, e3 := in.GetBool("notBool")
|
||||||
|
assert.Error(t, e3)
|
||||||
|
assert.Equal(t, false, v3)
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParamsGetStruct(t *testing.T) {
|
||||||
|
in := Params{
|
||||||
|
"struct": Params{
|
||||||
|
"String": "one",
|
||||||
|
"Float": 4.2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out struct {
|
||||||
|
String string
|
||||||
|
Float float64
|
||||||
|
}
|
||||||
|
e1 := in.GetStruct("struct", &out)
|
||||||
|
assert.NoError(t, e1)
|
||||||
|
assert.Equal(t, "one", out.String)
|
||||||
|
assert.Equal(t, 4.2, out.Float)
|
||||||
|
|
||||||
|
e2 := in.GetStruct("notOK", &out)
|
||||||
|
assert.Error(t, e2)
|
||||||
|
assert.Equal(t, "one", out.String)
|
||||||
|
assert.Equal(t, 4.2, out.Float)
|
||||||
|
assert.Equal(t, ErrParamNotFound("notOK"), e2)
|
||||||
|
|
||||||
|
in["struct"] = "string"
|
||||||
|
e3 := in.GetStruct("struct", &out)
|
||||||
|
assert.Error(t, e3)
|
||||||
|
assert.Equal(t, "one", out.String)
|
||||||
|
assert.Equal(t, 4.2, out.Float)
|
||||||
|
assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error())
|
||||||
|
}
|
28
fs/rc/rc.go
28
fs/rc/rc.go
|
@ -116,8 +116,6 @@ func (s *server) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.Debugf(nil, "form = %+v", r.Form)
|
|
||||||
|
|
||||||
w.Header().Add("Access-Control-Allow-Origin", "*")
|
w.Header().Add("Access-Control-Allow-Origin", "*")
|
||||||
//echo back headers client needs
|
//echo back headers client needs
|
||||||
reqAccessHeaders := r.Header.Get("Access-Control-Request-Headers")
|
reqAccessHeaders := r.Header.Get("Access-Control-Request-Headers")
|
||||||
|
@ -137,6 +135,11 @@ func (s *server) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string, in Params) {
|
func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string, in Params) {
|
||||||
writeError := func(err error, status int) {
|
writeError := func(err error, status int) {
|
||||||
fs.Errorf(nil, "rc: %q: error: %v", path, err)
|
fs.Errorf(nil, "rc: %q: error: %v", path, err)
|
||||||
|
// Adjust the error return for some well known errors
|
||||||
|
switch errors.Cause(err) {
|
||||||
|
case fs.ErrorDirNotFound, fs.ErrorObjectNotFound:
|
||||||
|
status = http.StatusNotFound
|
||||||
|
}
|
||||||
w.WriteHeader(status)
|
w.WriteHeader(status)
|
||||||
err = WriteJSON(w, Params{
|
err = WriteJSON(w, Params{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
|
@ -155,13 +158,28 @@ func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
|
// Check to see if it is async or not
|
||||||
out, err := call.Fn(in)
|
isAsync, err := in.GetBool("_async")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(errors.Wrap(err, "remote control command failed"), http.StatusInternalServerError)
|
writeError(err, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
|
||||||
|
var out Params
|
||||||
|
if isAsync {
|
||||||
|
out, err = StartJob(call.Fn, in)
|
||||||
|
} else {
|
||||||
|
out, err = call.Fn(in)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
writeError(err, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
out = make(Params)
|
||||||
|
}
|
||||||
|
|
||||||
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
|
fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err)
|
||||||
err = WriteJSON(w, out)
|
err = WriteJSON(w, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
23
fs/rc/rc_test.go
Normal file
23
fs/rc/rc_test.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package rc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWriteJSON(t *testing.T) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := WriteJSON(&buf, Params{
|
||||||
|
"String": "hello",
|
||||||
|
"Int": 42,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, `{
|
||||||
|
"Int": 42,
|
||||||
|
"String": "hello"
|
||||||
|
}
|
||||||
|
`, buf.String())
|
||||||
|
}
|
|
@ -10,9 +10,6 @@ import (
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Params is the input and output type for the Func
|
|
||||||
type Params map[string]interface{}
|
|
||||||
|
|
||||||
// Func defines a type for a remote control function
|
// Func defines a type for a remote control function
|
||||||
type Func func(in Params) (out Params, err error)
|
type Func func(in Params) (out Params, err error)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue