forked from TrueCloudLab/rclone
opendrive: refactor to use existing lib/rest facilities for uploads
This also checks the return of the call to make sure the number of bytes written was as expected.
This commit is contained in:
parent
71587344c6
commit
493dfb68fd
2 changed files with 21 additions and 68 deletions
|
@ -1,11 +1,9 @@
|
||||||
package opendrive
|
package opendrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
@ -947,84 +945,35 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
fs.Debugf(o, "Uploading chunk %d, size=%d, remain=%d", chunkCounter, currentChunkSize, remainingBytes)
|
fs.Debugf(o, "Uploading chunk %d, size=%d, remain=%d", chunkCounter, currentChunkSize, remainingBytes)
|
||||||
|
|
||||||
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, currentChunkSize)
|
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, currentChunkSize)
|
||||||
|
var reply uploadFileChunkReply
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
// seek to the start in case this is a retry
|
// seek to the start in case this is a retry
|
||||||
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
|
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
var formBody bytes.Buffer
|
|
||||||
w := multipart.NewWriter(&formBody)
|
|
||||||
fw, err := w.CreateFormFile("file_data", o.remote)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = io.Copy(fw, chunk); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add session_id
|
|
||||||
if fw, err = w.CreateFormField("session_id"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(o.fs.session.SessionID)); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add session_id
|
|
||||||
if fw, err = w.CreateFormField("session_id"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(o.fs.session.SessionID)); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add file_id
|
|
||||||
if fw, err = w.CreateFormField("file_id"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(o.id)); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add temp_location
|
|
||||||
if fw, err = w.CreateFormField("temp_location"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(openResponse.TempLocation)); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add chunk_offset
|
|
||||||
if fw, err = w.CreateFormField("chunk_offset"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(strconv.FormatInt(chunkOffset, 10))); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Add chunk_size
|
|
||||||
if fw, err = w.CreateFormField("chunk_size"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if _, err = fw.Write([]byte(strconv.FormatInt(currentChunkSize, 10))); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// Don't forget to close the multipart writer.
|
|
||||||
// If you don't close it, your request will be missing the terminating boundary.
|
|
||||||
err = w.Close()
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "POST",
|
Method: "POST",
|
||||||
Path: "/upload/upload_file_chunk.json",
|
Path: "/upload/upload_file_chunk.json",
|
||||||
Body: &formBody,
|
Body: chunk,
|
||||||
ExtraHeaders: map[string]string{"Content-Type": w.FormDataContentType()},
|
MultipartParams: url.Values{
|
||||||
|
"session_id": []string{o.fs.session.SessionID},
|
||||||
|
"file_id": []string{o.id},
|
||||||
|
"temp_location": []string{openResponse.TempLocation},
|
||||||
|
"chunk_offset": []string{strconv.FormatInt(chunkOffset, 10)},
|
||||||
|
"chunk_size": []string{strconv.FormatInt(currentChunkSize, 10)},
|
||||||
|
},
|
||||||
|
MultipartContentName: "file_data", // ..name of the parameter which is the attached file
|
||||||
|
MultipartFileName: o.remote, // ..name of the file for the attached file
|
||||||
|
|
||||||
}
|
}
|
||||||
resp, err = o.fs.srv.Call(&opts)
|
resp, err = o.fs.srv.CallJSON(&opts, nil, &reply)
|
||||||
return o.fs.shouldRetry(resp, err)
|
return o.fs.shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "failed to create file")
|
return errors.Wrap(err, "failed to create file")
|
||||||
}
|
}
|
||||||
err = resp.Body.Close()
|
if reply.TotalWritten != currentChunkSize {
|
||||||
if err != nil {
|
return errors.Errorf("failed to create file: incomplete write of %d/%d bytes", reply.TotalWritten, currentChunkSize)
|
||||||
return errors.Wrap(err, "close failed on create file")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkCounter++
|
chunkCounter++
|
||||||
|
|
|
@ -212,3 +212,7 @@ type permissions struct {
|
||||||
FileID string `json:"file_id"`
|
FileID string `json:"file_id"`
|
||||||
FileIsPublic int64 `json:"file_ispublic"`
|
FileIsPublic int64 `json:"file_ispublic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type uploadFileChunkReply struct {
|
||||||
|
TotalWritten int64 `json:"TotalWritten"`
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue