jottacloud: resume and deduplication support
This commit is contained in:
parent
ba84eecd94
commit
e4dfe78ef0
2 changed files with 258 additions and 47 deletions
|
@ -9,7 +9,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// default time format for almost all request and responses
|
||||||
timeFormat = "2006-01-02-T15:04:05Z0700"
|
timeFormat = "2006-01-02-T15:04:05Z0700"
|
||||||
|
// the API server seems to use a different format
|
||||||
|
apiTimeFormat = "2006-01-02T15:04:05Z07:00"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Time represents time values in the Jottacloud API. It uses a custom RFC3339 like format.
|
// Time represents time values in the Jottacloud API. It uses a custom RFC3339 like format.
|
||||||
|
@ -40,6 +43,9 @@ func (t *Time) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
|
||||||
// Return Time string in Jottacloud format
|
// Return Time string in Jottacloud format
|
||||||
func (t Time) String() string { return time.Time(t).Format(timeFormat) }
|
func (t Time) String() string { return time.Time(t).Format(timeFormat) }
|
||||||
|
|
||||||
|
// APIString returns Time string in Jottacloud API format
|
||||||
|
func (t Time) APIString() string { return time.Time(t).Format(apiTimeFormat) }
|
||||||
|
|
||||||
// Flag is a hacky type for checking if an attribute is present
|
// Flag is a hacky type for checking if an attribute is present
|
||||||
type Flag bool
|
type Flag bool
|
||||||
|
|
||||||
|
@ -265,3 +271,37 @@ func (e *Error) Error() string {
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllocateFileRequest to prepare an upload to Jottacloud
|
||||||
|
type AllocateFileRequest struct {
|
||||||
|
Bytes int64 `json:"bytes"`
|
||||||
|
Created string `json:"created"`
|
||||||
|
Md5 string `json:"md5"`
|
||||||
|
Modified string `json:"modified"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllocateFileResponse for upload requests
|
||||||
|
type AllocateFileResponse struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
State string `json:"state"`
|
||||||
|
UploadID string `json:"upload_id"`
|
||||||
|
UploadURL string `json:"upload_url"`
|
||||||
|
Bytes int64 `json:"bytes"`
|
||||||
|
ResumePos int64 `json:"resume_pos"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadResponse after an upload
|
||||||
|
type UploadResponse struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
ContentID string `json:"content_id"`
|
||||||
|
Bytes int64 `json:"bytes"`
|
||||||
|
Md5 string `json:"md5"`
|
||||||
|
Created int64 `json:"created"`
|
||||||
|
Modified int64 `json:"modified"`
|
||||||
|
Deleted interface{} `json:"deleted"`
|
||||||
|
Mime string `json:"mime"`
|
||||||
|
}
|
||||||
|
|
|
@ -26,22 +26,42 @@ import (
|
||||||
"github.com/ncw/rclone/fs/fshttp"
|
"github.com/ncw/rclone/fs/fshttp"
|
||||||
"github.com/ncw/rclone/fs/hash"
|
"github.com/ncw/rclone/fs/hash"
|
||||||
"github.com/ncw/rclone/fs/walk"
|
"github.com/ncw/rclone/fs/walk"
|
||||||
|
"github.com/ncw/rclone/lib/oauthutil"
|
||||||
"github.com/ncw/rclone/lib/pacer"
|
"github.com/ncw/rclone/lib/pacer"
|
||||||
"github.com/ncw/rclone/lib/rest"
|
"github.com/ncw/rclone/lib/rest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/oauth2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
const (
|
const (
|
||||||
minSleep = 10 * time.Millisecond
|
minSleep = 10 * time.Millisecond
|
||||||
maxSleep = 2 * time.Second
|
maxSleep = 2 * time.Second
|
||||||
decayConstant = 2 // bigger for slower decay, exponential
|
decayConstant = 2 // bigger for slower decay, exponential
|
||||||
defaultDevice = "Jotta"
|
defaultDevice = "Jotta"
|
||||||
defaultMountpoint = "Sync"
|
defaultMountpoint = "Sync"
|
||||||
rootURL = "https://www.jottacloud.com/jfs/"
|
rootURL = "https://www.jottacloud.com/jfs/"
|
||||||
apiURL = "https://api.jottacloud.com"
|
apiURL = "https://api.jottacloud.com/files/v1/"
|
||||||
shareURL = "https://www.jottacloud.com/"
|
shareURL = "https://www.jottacloud.com/"
|
||||||
cachePrefix = "rclone-jcmd5-"
|
tokenURL = "https://api.jottacloud.com/auth/v1/token"
|
||||||
|
cachePrefix = "rclone-jcmd5-"
|
||||||
|
rcloneClientID = "nibfk8biu12ju7hpqomr8b1e40"
|
||||||
|
rcloneEncryptedClientSecret = "Vp8eAv7eVElMnQwN-kgU9cbhgApNDaMqWdlDi5qFydlQoji4JBxrGMF2"
|
||||||
|
configUsername = "user"
|
||||||
|
configPassword = "pass"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Description of how to auth for this app for a personal account
|
||||||
|
oauthConfig = &oauth2.Config{
|
||||||
|
Endpoint: oauth2.Endpoint{
|
||||||
|
AuthURL: tokenURL,
|
||||||
|
TokenURL: tokenURL,
|
||||||
|
},
|
||||||
|
ClientID: rcloneClientID,
|
||||||
|
ClientSecret: obscure.MustReveal(rcloneEncryptedClientSecret),
|
||||||
|
RedirectURL: oauthutil.RedirectLocalhostURL,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register with Fs
|
// Register with Fs
|
||||||
|
@ -50,11 +70,70 @@ func init() {
|
||||||
Name: "jottacloud",
|
Name: "jottacloud",
|
||||||
Description: "JottaCloud",
|
Description: "JottaCloud",
|
||||||
NewFs: NewFs,
|
NewFs: NewFs,
|
||||||
|
Config: func(name string, m configmap.Mapper) {
|
||||||
|
username, ok := m.Get(configUsername)
|
||||||
|
if !ok {
|
||||||
|
fs.Errorf(nil, "No username defined")
|
||||||
|
}
|
||||||
|
var password string
|
||||||
|
password, ok = m.Get(configPassword)
|
||||||
|
if !ok {
|
||||||
|
fs.Errorf(nil, "No username defined")
|
||||||
|
}
|
||||||
|
password = obscure.MustReveal(password)
|
||||||
|
|
||||||
|
srv := rest.NewClient(fshttp.NewClient(fs.Config))
|
||||||
|
|
||||||
|
values := url.Values{}
|
||||||
|
values.Set("grant_type", "PASSWORD")
|
||||||
|
values.Set("password", password)
|
||||||
|
values.Set("username", username)
|
||||||
|
values.Set("client_id", rcloneClientID)
|
||||||
|
values.Set("client_secret", obscure.MustReveal(rcloneEncryptedClientSecret))
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "POST",
|
||||||
|
RootURL: oauthConfig.Endpoint.AuthURL,
|
||||||
|
ContentType: "application/x-www-form-urlencoded",
|
||||||
|
Body: strings.NewReader(values.Encode()),
|
||||||
|
}
|
||||||
|
|
||||||
|
// tokenJSON is the struct representing the HTTP response from OAuth2
|
||||||
|
// providers returning a token in JSON form.
|
||||||
|
type tokenJSON struct {
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
TokenType string `json:"token_type"`
|
||||||
|
RefreshToken string `json:"refresh_token"`
|
||||||
|
ExpiresIn int32 `json:"expires_in"` // at least PayPal returns string, while most return number
|
||||||
|
}
|
||||||
|
var jsonToken tokenJSON
|
||||||
|
|
||||||
|
resp, err := srv.CallJSON(&opts, nil, &jsonToken)
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(nil, "Failed to get resource token: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
fs.Errorf(nil, "Failed to get resource token: Got HTTP error code %d", resp.StatusCode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var token oauth2.Token
|
||||||
|
token.AccessToken = jsonToken.AccessToken
|
||||||
|
token.RefreshToken = jsonToken.RefreshToken
|
||||||
|
token.TokenType = jsonToken.TokenType
|
||||||
|
token.Expiry = time.Now().Add(time.Duration(jsonToken.ExpiresIn) * time.Second)
|
||||||
|
|
||||||
|
// finally save them in the config
|
||||||
|
err = oauthutil.PutToken(name, m, &token, true)
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(nil, "Error while setting token: %s", err)
|
||||||
|
}
|
||||||
|
},
|
||||||
Options: []fs.Option{{
|
Options: []fs.Option{{
|
||||||
Name: "user",
|
Name: configUsername,
|
||||||
Help: "User Name",
|
Help: "User Name",
|
||||||
}, {
|
}, {
|
||||||
Name: "pass",
|
Name: configPassword,
|
||||||
Help: "Password.",
|
Help: "Password.",
|
||||||
IsPassword: true,
|
IsPassword: true,
|
||||||
}, {
|
}, {
|
||||||
|
@ -83,6 +162,11 @@ func init() {
|
||||||
Help: "Remove existing public link to file/folder with link command rather than creating.\nDefault is false, meaning link command will create or retrieve public link.",
|
Help: "Remove existing public link to file/folder with link command rather than creating.\nDefault is false, meaning link command will create or retrieve public link.",
|
||||||
Default: false,
|
Default: false,
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
|
}, {
|
||||||
|
Name: "upload_resume_limit",
|
||||||
|
Help: "Files bigger than this can be resumed if the upload failes.",
|
||||||
|
Default: fs.SizeSuffix(10 * 1024 * 1024),
|
||||||
|
Advanced: true,
|
||||||
}},
|
}},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -95,18 +179,21 @@ type Options struct {
|
||||||
MD5MemoryThreshold fs.SizeSuffix `config:"md5_memory_limit"`
|
MD5MemoryThreshold fs.SizeSuffix `config:"md5_memory_limit"`
|
||||||
HardDelete bool `config:"hard_delete"`
|
HardDelete bool `config:"hard_delete"`
|
||||||
Unlink bool `config:"unlink"`
|
Unlink bool `config:"unlink"`
|
||||||
|
UploadThreshold fs.SizeSuffix `config:"upload_resume_limit"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fs represents a remote jottacloud
|
// Fs represents a remote jottacloud
|
||||||
type Fs struct {
|
type Fs struct {
|
||||||
name string
|
name string
|
||||||
root string
|
root string
|
||||||
user string
|
user string
|
||||||
opt Options
|
opt Options
|
||||||
features *fs.Features
|
features *fs.Features
|
||||||
endpointURL string
|
endpointURL string
|
||||||
srv *rest.Client
|
srv *rest.Client
|
||||||
pacer *pacer.Pacer
|
apiSrv *rest.Client
|
||||||
|
pacer *pacer.Pacer
|
||||||
|
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a jottacloud object
|
// Object describes a jottacloud object
|
||||||
|
@ -261,6 +348,38 @@ func (o *Object) filePath() string {
|
||||||
return o.fs.filePath(o.remote)
|
return o.fs.filePath(o.remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dummyClose is required for the grantTypeFilter below
|
||||||
|
// because http.Request.Body needs to be a io.ReadCloser
|
||||||
|
// and bytes.NewReader is only a io.Reader
|
||||||
|
type dummyCloser struct {
|
||||||
|
io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dummyCloser) Close() error { return nil }
|
||||||
|
|
||||||
|
// Jottacloud requires the grant_type 'refresh_token' string
|
||||||
|
// to be uppercase and throws a 400 Bad Request if we use the
|
||||||
|
// lower case used by the oauth2 module
|
||||||
|
//
|
||||||
|
// This filter catches all refresh requests, reads the body,
|
||||||
|
// changes the case and then sends it on
|
||||||
|
func grantTypeFilter(req *http.Request) {
|
||||||
|
if tokenURL == req.URL.String() {
|
||||||
|
// read the entire body
|
||||||
|
refreshBody, err := ioutil.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = req.Body.Close()
|
||||||
|
|
||||||
|
// make the refesh token upper case
|
||||||
|
refreshBody = []byte(strings.Replace(string(refreshBody), "grant_type=refresh_token", "grant_type=REFRESH_TOKEN", 1))
|
||||||
|
|
||||||
|
// set the new ReadCloser (with a dummy Close())
|
||||||
|
req.Body = &dummyCloser{bytes.NewReader(refreshBody)}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, container:path
|
// NewFs constructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
// Parse config into Options struct
|
// Parse config into Options struct
|
||||||
|
@ -284,14 +403,30 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the oauth client for the api servers needs
|
||||||
|
// a filter to fix the grant_type issues (see above)
|
||||||
|
baseClient := fshttp.NewClient(fs.Config)
|
||||||
|
if do, ok := baseClient.Transport.(interface {
|
||||||
|
SetRequestFilter(f func(req *http.Request))
|
||||||
|
}); ok {
|
||||||
|
do.SetRequestFilter(grantTypeFilter)
|
||||||
|
} else {
|
||||||
|
fs.Debugf(name+":", "Couldn't add request filter - uploads will fail")
|
||||||
|
}
|
||||||
|
oAuthClient, ts, err := oauthutil.NewClientWithBaseClient(name, m, oauthConfig, baseClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Failed to configure Jottacloud oauth client")
|
||||||
|
}
|
||||||
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
root: root,
|
root: root,
|
||||||
user: opt.User,
|
user: opt.User,
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
//endpointURL: rest.URLPathEscape(path.Join(user, defaultDevice, opt.Mountpoint)),
|
//endpointURL: rest.URLPathEscape(path.Join(user, defaultDevice, opt.Mountpoint)),
|
||||||
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetRoot(rootURL),
|
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetRoot(rootURL),
|
||||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
apiSrv: rest.NewClient(oAuthClient).SetRoot(apiURL),
|
||||||
|
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||||
}
|
}
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
CaseInsensitive: true,
|
CaseInsensitive: true,
|
||||||
|
@ -300,6 +435,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
|
|
||||||
|
f.srv.SetErrorHandler(errorHandler)
|
||||||
|
|
||||||
|
// Renew the token in the background
|
||||||
|
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
|
||||||
|
_, err := f.readMetaDataForPath("")
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
if user == "" || pass == "" {
|
if user == "" || pass == "" {
|
||||||
return nil, errors.New("jottacloud needs user and password")
|
return nil, errors.New("jottacloud needs user and password")
|
||||||
}
|
}
|
||||||
|
@ -1041,42 +1184,70 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
var result api.JottaFile
|
// use the api to allocate the file first and get resume / deduplication info
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "POST",
|
Method: "POST",
|
||||||
Path: o.filePath(),
|
Path: "allocate",
|
||||||
Body: in,
|
ExtraHeaders: make(map[string]string),
|
||||||
ContentType: fs.MimeType(src),
|
}
|
||||||
ContentLength: &size,
|
fileDate := api.Time(src.ModTime()).APIString()
|
||||||
ExtraHeaders: make(map[string]string),
|
|
||||||
Parameters: url.Values{},
|
// the allocate request
|
||||||
|
var request = api.AllocateFileRequest{
|
||||||
|
Bytes: size,
|
||||||
|
Created: fileDate,
|
||||||
|
Modified: fileDate,
|
||||||
|
Md5: md5String,
|
||||||
|
Path: "/" + o.fs.opt.Mountpoint + "/" + replaceReservedChars(path.Join(o.fs.root, o.remote)),
|
||||||
}
|
}
|
||||||
|
|
||||||
opts.ExtraHeaders["JMd5"] = md5String
|
// send it
|
||||||
opts.Parameters.Set("cphash", md5String)
|
var response api.AllocateFileResponse
|
||||||
opts.ExtraHeaders["JSize"] = strconv.FormatInt(size, 10)
|
|
||||||
// opts.ExtraHeaders["JCreated"] = api.Time(src.ModTime()).String()
|
|
||||||
opts.ExtraHeaders["JModified"] = api.Time(src.ModTime()).String()
|
|
||||||
|
|
||||||
// Parameters observed in other implementations
|
|
||||||
//opts.ExtraHeaders["X-Jfs-DeviceName"] = "Jotta"
|
|
||||||
//opts.ExtraHeaders["X-Jfs-Devicename-Base64"] = ""
|
|
||||||
//opts.ExtraHeaders["X-Jftp-Version"] = "2.4" this appears to be the current version
|
|
||||||
//opts.ExtraHeaders["jx_csid"] = ""
|
|
||||||
//opts.ExtraHeaders["jx_lisence"] = ""
|
|
||||||
|
|
||||||
opts.Parameters.Set("umode", "nomultipart")
|
|
||||||
|
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
resp, err = o.fs.srv.CallXML(&opts, nil, &result)
|
resp, err = o.fs.apiSrv.CallJSON(&opts, &request, &response)
|
||||||
return shouldRetry(resp, err)
|
return shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Check returned Metadata? Timeout on big uploads?
|
// Can be INCOMPLETE and CORRPUT, try to upload a then
|
||||||
return o.setMetaData(&result)
|
if response.State != "COMPLETED" {
|
||||||
|
// how much do we still have to upload?
|
||||||
|
remainingBytes := size - response.ResumePos
|
||||||
|
opts = rest.Opts{
|
||||||
|
Method: "POST",
|
||||||
|
RootURL: response.UploadURL,
|
||||||
|
ContentLength: &remainingBytes,
|
||||||
|
Body: in,
|
||||||
|
ExtraHeaders: make(map[string]string),
|
||||||
|
}
|
||||||
|
opts.ExtraHeaders["Content-Type"] = "application/octet-stream"
|
||||||
|
if response.ResumePos != 0 {
|
||||||
|
opts.ExtraHeaders["Range"] = "bytes=" + strconv.FormatInt(response.ResumePos, 10) + "-" + strconv.FormatInt(size-1, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy the already uploaded bytes into the trash :)
|
||||||
|
var result api.UploadResponse
|
||||||
|
_, err = io.CopyN(ioutil.Discard, in, response.ResumePos)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// send the remaining bytes
|
||||||
|
resp, err = o.fs.apiSrv.CallJSON(&opts, nil, &result)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally update the meta data
|
||||||
|
o.hasMetaData = true
|
||||||
|
o.size = int64(result.Bytes)
|
||||||
|
o.md5 = result.Md5
|
||||||
|
o.modTime = time.Unix(result.Modified/1000, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
|
|
Loading…
Add table
Reference in a new issue