forked from TrueCloudLab/rclone
qingstor: Support hash md5 for upload object
* Using single object to uploaded when files less than or equal to 67108864 bytes * Using multi-part object to uploaded when files large than 67108864 bytes, and calculate MD5SUMS in the upload process * For Mkdir and Rmdir, Add block to wait qingstor service sync status to handling extreme cases that try to create a just deleted bucket or delete a just created bucket etc
This commit is contained in:
parent
f407e3da55
commit
ee9f987234
3 changed files with 492 additions and 96 deletions
|
@ -30,7 +30,7 @@ Here is an overview of the major features of each cloud storage system.
|
||||||
| Microsoft Azure Blob Storage | MD5 | Yes | No | No | R/W |
|
| Microsoft Azure Blob Storage | MD5 | Yes | No | No | R/W |
|
||||||
| Microsoft OneDrive | SHA1 | Yes | Yes | No | R |
|
| Microsoft OneDrive | SHA1 | Yes | Yes | No | R |
|
||||||
| Openstack Swift | MD5 | Yes | No | No | R/W |
|
| Openstack Swift | MD5 | Yes | No | No | R/W |
|
||||||
| QingStor | - | No | No | No | R/W |
|
| QingStor | MD5 | No | No | No | R/W |
|
||||||
| SFTP | MD5, SHA1 * | Yes | Depends | No | - |
|
| SFTP | MD5, SHA1 * | Yes | Depends | No | - |
|
||||||
| Yandex Disk | MD5 | Yes | No | No | R/W |
|
| Yandex Disk | MD5 | Yes | No | No | R/W |
|
||||||
| The local filesystem | All | Yes | Depends | No | - |
|
| The local filesystem | All | Yes | Depends | No | - |
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
package qingstor
|
package qingstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -65,6 +64,11 @@ func init() {
|
||||||
|
|
||||||
Help: "The Shanghai (China) First Zone\nNeeds location constraint sh1a.",
|
Help: "The Shanghai (China) First Zone\nNeeds location constraint sh1a.",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Value: "gd2a",
|
||||||
|
|
||||||
|
Help: "The Guangdong (China) Second Zone\nNeeds location constraint gd2a.",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
Name: "connection_retries",
|
Name: "connection_retries",
|
||||||
|
@ -75,11 +79,8 @@ func init() {
|
||||||
|
|
||||||
// Constants
|
// Constants
|
||||||
const (
|
const (
|
||||||
listLimitSize = 1000 // Number of items to read at once
|
listLimitSize = 1000 // Number of items to read at once
|
||||||
maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY
|
maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY
|
||||||
maxSizeForPart = 1024 * 1024 * 1024 * 1 // The maximum size of object we can Upload in Multipart Upload API
|
|
||||||
multipartUploadSize = 1024 * 1024 * 64 // The size of multipart upload object as once.
|
|
||||||
MaxMultipleParts = 10000 // The maximum number of upload multiple parts
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
|
@ -193,6 +194,8 @@ func qsServiceConnection(name string) (*qs.Service, error) {
|
||||||
host = _host
|
host = _host
|
||||||
if _port != "" {
|
if _port != "" {
|
||||||
port, _ = strconv.Atoi(_port)
|
port, _ = strconv.Atoi(_port)
|
||||||
|
} else if protocol == "http" {
|
||||||
|
port = 80
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -300,9 +303,8 @@ func (f *Fs) Precision() time.Duration {
|
||||||
|
|
||||||
// Hashes returns the supported hash sets.
|
// Hashes returns the supported hash sets.
|
||||||
func (f *Fs) Hashes() fs.HashSet {
|
func (f *Fs) Hashes() fs.HashSet {
|
||||||
//return fs.HashSet(fs.HashMD5)
|
return fs.HashSet(fs.HashMD5)
|
||||||
//Not supported temporary
|
//return fs.HashSet(fs.HashNone)
|
||||||
return fs.HashSet(fs.HashNone)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Features returns the optional features of this Fs
|
// Features returns the optional features of this Fs
|
||||||
|
@ -631,6 +633,31 @@ func (f *Fs) Mkdir(dir string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
/* When delete a bucket, qingstor need about 60 second to sync status;
|
||||||
|
So, need wait for it sync end if we try to operation a just deleted bucket
|
||||||
|
*/
|
||||||
|
retries := 0
|
||||||
|
for retries <= 120 {
|
||||||
|
statistics, err := bucketInit.GetStatistics()
|
||||||
|
if statistics == nil || err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
switch *statistics.Status {
|
||||||
|
case "deleted":
|
||||||
|
fs.Debugf(f, "Wiat for qingstor sync bucket status, retries: %d", retries)
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
retries++
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if !f.bucketDeleted {
|
if !f.bucketDeleted {
|
||||||
exists, err := f.dirExists()
|
exists, err := f.dirExists()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -641,10 +668,6 @@ func (f *Fs) Mkdir(dir string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = bucketInit.Put()
|
_, err = bucketInit.Put()
|
||||||
if e, ok := err.(*qsErr.QingStorError); ok {
|
if e, ok := err.(*qsErr.QingStorError); ok {
|
||||||
if e.StatusCode == http.StatusConflict {
|
if e.StatusCode == http.StatusConflict {
|
||||||
|
@ -662,21 +685,17 @@ func (f *Fs) Mkdir(dir string) error {
|
||||||
|
|
||||||
// dirIsEmpty check if the bucket empty
|
// dirIsEmpty check if the bucket empty
|
||||||
func (f *Fs) dirIsEmpty() (bool, error) {
|
func (f *Fs) dirIsEmpty() (bool, error) {
|
||||||
limit := 8
|
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req := qs.ListObjectsInput{
|
statistics, err := bucketInit.GetStatistics()
|
||||||
Limit: &limit,
|
|
||||||
}
|
|
||||||
rsp, err := bucketInit.ListObjects(&req)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return true, err
|
||||||
}
|
}
|
||||||
if len(rsp.Keys) == 0 {
|
|
||||||
|
if *statistics.Count == 0 {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -704,7 +723,30 @@ func (f *Fs) Rmdir(dir string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = bucketInit.Delete()
|
retries := 0
|
||||||
|
for retries <= 10 {
|
||||||
|
_, delErr := bucketInit.Delete()
|
||||||
|
if delErr != nil {
|
||||||
|
if e, ok := delErr.(*qsErr.QingStorError); ok {
|
||||||
|
switch e.Code {
|
||||||
|
// The status of "lease" takes a few seconds to "ready" when creating a new bucket
|
||||||
|
// wait for lease status ready
|
||||||
|
case "lease_not_ready":
|
||||||
|
fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries)
|
||||||
|
retries++
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
err = e
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = delErr
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.bucketOK = false
|
f.bucketOK = false
|
||||||
f.bucketDeleted = true
|
f.bucketDeleted = true
|
||||||
|
@ -839,85 +881,24 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
//Initiate Upload Multipart
|
|
||||||
key := o.fs.root + o.remote
|
key := o.fs.root + o.remote
|
||||||
var objectParts = []*qs.ObjectPartType{}
|
// Guess the content type
|
||||||
var uploadID *string
|
|
||||||
var partNumber int
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
fs.Errorf(o, "Create Object Faild, API ERROR: %v", err)
|
|
||||||
// Abort Upload when init success and upload failed
|
|
||||||
if uploadID != nil {
|
|
||||||
fs.Debugf(o, "Abort Upload Multipart, upload_id: %s, objectParts: %+v", *uploadID, objectParts)
|
|
||||||
abortReq := qs.AbortMultipartUploadInput{
|
|
||||||
UploadID: uploadID,
|
|
||||||
}
|
|
||||||
_, _ = bucketInit.AbortMultipartUpload(key, &abortReq)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
fs.Debugf(o, "Initiate Upload Multipart, key: %s", key)
|
|
||||||
mimeType := fs.MimeType(src)
|
mimeType := fs.MimeType(src)
|
||||||
initReq := qs.InitiateMultipartUploadInput{
|
|
||||||
ContentType: &mimeType,
|
req := uploadInput{
|
||||||
|
body: in,
|
||||||
|
qsSvc: o.fs.svc,
|
||||||
|
bucket: o.fs.bucket,
|
||||||
|
zone: o.fs.zone,
|
||||||
|
key: key,
|
||||||
|
mimeType: mimeType,
|
||||||
}
|
}
|
||||||
rsp, err := bucketInit.InitiateMultipartUpload(key, &initReq)
|
uploader := newUploader(&req)
|
||||||
|
|
||||||
|
err = uploader.upload()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
uploadID = rsp.UploadID
|
|
||||||
|
|
||||||
// Create an new buffer
|
|
||||||
buffer := new(bytes.Buffer)
|
|
||||||
|
|
||||||
for {
|
|
||||||
size, er := io.CopyN(buffer, in, multipartUploadSize)
|
|
||||||
if er != nil && er != io.EOF {
|
|
||||||
err = fmt.Errorf("read upload data failed, error: %s", er)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if size == 0 && partNumber > 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Upload Multipart Object
|
|
||||||
number := partNumber
|
|
||||||
req := qs.UploadMultipartInput{
|
|
||||||
PartNumber: &number,
|
|
||||||
UploadID: uploadID,
|
|
||||||
ContentLength: &size,
|
|
||||||
Body: buffer,
|
|
||||||
}
|
|
||||||
fs.Debugf(o, "Upload Multipart, upload_id: %s, part_number: %d", *uploadID, number)
|
|
||||||
_, err = bucketInit.UploadMultipart(key, &req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
part := qs.ObjectPartType{
|
|
||||||
PartNumber: &number,
|
|
||||||
Size: &size,
|
|
||||||
}
|
|
||||||
objectParts = append(objectParts, &part)
|
|
||||||
partNumber++
|
|
||||||
}
|
|
||||||
|
|
||||||
// Complete Multipart Upload
|
|
||||||
fs.Debugf(o, "Complete Upload Multipart, upload_id: %s, objectParts: %d", *uploadID, len(objectParts))
|
|
||||||
completeReq := qs.CompleteMultipartUploadInput{
|
|
||||||
UploadID: uploadID,
|
|
||||||
ObjectParts: objectParts,
|
|
||||||
}
|
|
||||||
_, err = bucketInit.CompleteMultipartUpload(key, &completeReq)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read Metadata of object
|
// Read Metadata of object
|
||||||
err = o.readMetaData()
|
err = o.readMetaData()
|
||||||
return err
|
return err
|
||||||
|
|
415
qingstor/upload.go
Normal file
415
qingstor/upload.go
Normal file
|
@ -0,0 +1,415 @@
|
||||||
|
// Upload object to QingStor
|
||||||
|
|
||||||
|
// +build !plan9
|
||||||
|
|
||||||
|
package qingstor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/md5"
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
qs "github.com/yunify/qingstor-sdk-go/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxSinglePartSize = 1024 * 1024 * 1024 * 5 // The maximum allowed size when uploading a single object to QingStor
|
||||||
|
maxMultiPartSize = 1024 * 1024 * 1024 * 1 // The maximum allowed part size when uploading a part to QingStor
|
||||||
|
minMultiPartSize = 1024 * 1024 * 4 // The minimum allowed part size when uploading a part to QingStor
|
||||||
|
maxMultiParts = 10000 // The maximum allowed number of parts in an multi-part upload
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultUploadPartSize = 1024 * 1024 * 64 // The default part size to buffer chunks of a payload into.
|
||||||
|
defaultUploadConcurrency = 4 // the default number of goroutines to spin up when using multiPartUpload.
|
||||||
|
)
|
||||||
|
|
||||||
|
func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
|
||||||
|
for offset < len(b) && err == nil {
|
||||||
|
var n int
|
||||||
|
n, err = r.Read(b[offset:])
|
||||||
|
offset += n
|
||||||
|
}
|
||||||
|
|
||||||
|
return offset, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadInput contains all input for upload requests to QingStor.
|
||||||
|
type uploadInput struct {
|
||||||
|
body io.Reader
|
||||||
|
qsSvc *qs.Service
|
||||||
|
mimeType string
|
||||||
|
zone string
|
||||||
|
bucket string
|
||||||
|
key string
|
||||||
|
partSize int64
|
||||||
|
concurrency int
|
||||||
|
maxUploadParts int
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploader internal structure to manage an upload to QingStor.
|
||||||
|
type uploader struct {
|
||||||
|
cfg *uploadInput
|
||||||
|
totalSize int64 // set to -1 if the size is not known
|
||||||
|
readerPos int64 // current reader position
|
||||||
|
readerSize int64 // current reader content size
|
||||||
|
}
|
||||||
|
|
||||||
|
// newUploader creates a new Uploader instance to upload objects to QingStor.
|
||||||
|
func newUploader(in *uploadInput) *uploader {
|
||||||
|
u := &uploader{
|
||||||
|
cfg: in,
|
||||||
|
}
|
||||||
|
return u
|
||||||
|
}
|
||||||
|
|
||||||
|
// bucketInit initiate as bucket controller
|
||||||
|
func (u *uploader) bucketInit() (*qs.Bucket, error) {
|
||||||
|
bucketInit, err := u.cfg.qsSvc.Bucket(u.cfg.bucket, u.cfg.zone)
|
||||||
|
return bucketInit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// String converts uploader to a string
|
||||||
|
func (u *uploader) String() string {
|
||||||
|
return fmt.Sprintf("QingStor bucket %s key %s", u.cfg.bucket, u.cfg.key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// nextReader returns a seekable reader representing the next packet of data.
|
||||||
|
// This operation increases the shared u.readerPos counter, but note that it
|
||||||
|
// does not need to be wrapped in a mutex because nextReader is only called
|
||||||
|
// from the main thread.
|
||||||
|
func (u *uploader) nextReader() (io.ReadSeeker, int, error) {
|
||||||
|
type readerAtSeeker interface {
|
||||||
|
io.ReaderAt
|
||||||
|
io.ReadSeeker
|
||||||
|
}
|
||||||
|
switch r := u.cfg.body.(type) {
|
||||||
|
case readerAtSeeker:
|
||||||
|
var err error
|
||||||
|
n := u.cfg.partSize
|
||||||
|
if u.totalSize >= 0 {
|
||||||
|
bytesLeft := u.totalSize - u.readerPos
|
||||||
|
|
||||||
|
if bytesLeft <= u.cfg.partSize {
|
||||||
|
err = io.EOF
|
||||||
|
n = bytesLeft
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reader := io.NewSectionReader(r, u.readerPos, n)
|
||||||
|
u.readerPos += n
|
||||||
|
u.readerSize = n
|
||||||
|
return reader, int(n), err
|
||||||
|
|
||||||
|
default:
|
||||||
|
part := make([]byte, u.cfg.partSize)
|
||||||
|
n, err := readFillBuf(r, part)
|
||||||
|
u.readerPos += int64(n)
|
||||||
|
u.readerSize = int64(n)
|
||||||
|
return bytes.NewReader(part[0:n]), n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// init will initialize all default options.
|
||||||
|
func (u *uploader) init() {
|
||||||
|
if u.cfg.concurrency == 0 {
|
||||||
|
u.cfg.concurrency = defaultUploadConcurrency
|
||||||
|
}
|
||||||
|
if u.cfg.partSize == 0 {
|
||||||
|
u.cfg.partSize = defaultUploadPartSize
|
||||||
|
}
|
||||||
|
if u.cfg.maxUploadParts == 0 {
|
||||||
|
u.cfg.maxUploadParts = maxMultiParts
|
||||||
|
}
|
||||||
|
// Try to get the total size for some optimizations
|
||||||
|
u.totalSize = -1
|
||||||
|
switch r := u.cfg.body.(type) {
|
||||||
|
case io.Seeker:
|
||||||
|
pos, _ := r.Seek(0, 1)
|
||||||
|
defer func() {
|
||||||
|
_, _ = r.Seek(pos, 0)
|
||||||
|
}()
|
||||||
|
|
||||||
|
n, err := r.Seek(0, 2)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
u.totalSize = n
|
||||||
|
|
||||||
|
// Try to adjust partSize if it is too small and account for
|
||||||
|
// integer division truncation.
|
||||||
|
if u.totalSize/u.cfg.partSize >= int64(u.cfg.partSize) {
|
||||||
|
// Add one to the part size to account for remainders
|
||||||
|
// during the size calculation. e.g odd number of bytes.
|
||||||
|
u.cfg.partSize = (u.totalSize / int64(u.cfg.maxUploadParts)) + 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// singlePartUpload upload a single object that contentLength less than "defaultUploadPartSize"
|
||||||
|
func (u *uploader) singlePartUpload(buf io.ReadSeeker) error {
|
||||||
|
bucketInit, _ := u.bucketInit()
|
||||||
|
|
||||||
|
req := qs.PutObjectInput{
|
||||||
|
ContentLength: &u.readerPos,
|
||||||
|
ContentType: &u.cfg.mimeType,
|
||||||
|
Body: buf,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := bucketInit.PutObject(u.cfg.key, &req)
|
||||||
|
if err == nil {
|
||||||
|
fs.Debugf(u, "Upload single objcet finished")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload upload a object into QingStor
|
||||||
|
func (u *uploader) upload() error {
|
||||||
|
u.init()
|
||||||
|
|
||||||
|
if u.cfg.partSize < minMultiPartSize {
|
||||||
|
return errors.Errorf("part size must be at least %d bytes", minMultiPartSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do one read to determine if we have more than one part
|
||||||
|
reader, _, err := u.nextReader()
|
||||||
|
if err == io.EOF { // single part
|
||||||
|
fs.Debugf(u, "Tried to upload a singile object to QingStor")
|
||||||
|
return u.singlePartUpload(reader)
|
||||||
|
} else if err != nil {
|
||||||
|
return errors.Errorf("read upload data failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.Debugf(u, "Treied to upload a multi-part object to QingStor")
|
||||||
|
mu := multiUploader{uploader: u}
|
||||||
|
return mu.multiPartUpload(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
// internal structure to manage a specific multipart upload to QingStor.
|
||||||
|
type multiUploader struct {
|
||||||
|
*uploader
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mtx sync.Mutex
|
||||||
|
err error
|
||||||
|
uploadID *string
|
||||||
|
objectParts completedParts
|
||||||
|
hashMd5 hash.Hash
|
||||||
|
}
|
||||||
|
|
||||||
|
// keeps track of a single chunk of data being sent to QingStor.
|
||||||
|
type chunk struct {
|
||||||
|
buffer io.ReadSeeker
|
||||||
|
partNumber int
|
||||||
|
size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// completedParts is a wrapper to make parts sortable by their part number,
|
||||||
|
// since QingStor required this list to be sent in sorted order.
|
||||||
|
type completedParts []*qs.ObjectPartType
|
||||||
|
|
||||||
|
func (a completedParts) Len() int { return len(a) }
|
||||||
|
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
|
||||||
|
|
||||||
|
// String converts multiUploader to a string
|
||||||
|
func (mu *multiUploader) String() string {
|
||||||
|
if uploadID := mu.uploadID; uploadID != nil {
|
||||||
|
return fmt.Sprintf("QingStor bucket %s key %s uploadID %s", mu.cfg.bucket, mu.cfg.key, *uploadID)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("QingStor bucket %s key %s uploadID <nil>", mu.cfg.bucket, mu.cfg.key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getErr is a thread-safe getter for the error object
|
||||||
|
func (mu *multiUploader) getErr() error {
|
||||||
|
mu.mtx.Lock()
|
||||||
|
defer mu.mtx.Unlock()
|
||||||
|
return mu.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// setErr is a thread-safe setter for the error object
|
||||||
|
func (mu *multiUploader) setErr(e error) {
|
||||||
|
mu.mtx.Lock()
|
||||||
|
defer mu.mtx.Unlock()
|
||||||
|
mu.err = e
|
||||||
|
}
|
||||||
|
|
||||||
|
// readChunk runs in worker goroutines to pull chunks off of the ch channel
|
||||||
|
// and send() them as UploadPart requests.
|
||||||
|
func (mu *multiUploader) readChunk(ch chan chunk) {
|
||||||
|
defer mu.wg.Done()
|
||||||
|
for {
|
||||||
|
c, ok := <-ch
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if mu.getErr() == nil {
|
||||||
|
if err := mu.send(c); err != nil {
|
||||||
|
mu.setErr(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// initiate init an Multiple Object and obtain UploadID
|
||||||
|
func (mu *multiUploader) initiate() error {
|
||||||
|
bucketInit, _ := mu.bucketInit()
|
||||||
|
req := qs.InitiateMultipartUploadInput{
|
||||||
|
ContentType: &mu.cfg.mimeType,
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Tried to initiate a multi-part upload")
|
||||||
|
rsp, err := bucketInit.InitiateMultipartUpload(mu.cfg.key, &req)
|
||||||
|
if err == nil {
|
||||||
|
mu.uploadID = rsp.UploadID
|
||||||
|
mu.hashMd5 = md5.New()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// send upload a part into QingStor
|
||||||
|
func (mu *multiUploader) send(c chunk) error {
|
||||||
|
bucketInit, _ := mu.bucketInit()
|
||||||
|
req := qs.UploadMultipartInput{
|
||||||
|
PartNumber: &c.partNumber,
|
||||||
|
UploadID: mu.uploadID,
|
||||||
|
ContentLength: &c.size,
|
||||||
|
Body: c.buffer,
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Tried to upload a part to QingStor that partNumber %d and partSize %d", c.partNumber, c.size)
|
||||||
|
_, err := bucketInit.UploadMultipart(mu.cfg.key, &req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Upload part finished that partNumber %d and partSize %d", c.partNumber, c.size)
|
||||||
|
|
||||||
|
mu.mtx.Lock()
|
||||||
|
defer mu.mtx.Unlock()
|
||||||
|
|
||||||
|
_, _ = c.buffer.Seek(0, 0)
|
||||||
|
_, _ = io.Copy(mu.hashMd5, c.buffer)
|
||||||
|
|
||||||
|
parts := qs.ObjectPartType{PartNumber: &c.partNumber, Size: &c.size}
|
||||||
|
mu.objectParts = append(mu.objectParts, &parts)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// list list the ObjectParts of an multipart upload
|
||||||
|
func (mu *multiUploader) list() error {
|
||||||
|
bucketInit, _ := mu.bucketInit()
|
||||||
|
|
||||||
|
req := qs.ListMultipartInput{
|
||||||
|
UploadID: mu.uploadID,
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Tried to list a multi-part")
|
||||||
|
rsp, err := bucketInit.ListMultipart(mu.cfg.key, &req)
|
||||||
|
if err == nil {
|
||||||
|
mu.objectParts = rsp.ObjectParts
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// complete complete an multipart upload
|
||||||
|
func (mu *multiUploader) complete() error {
|
||||||
|
var err error
|
||||||
|
if err = mu.getErr(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bucketInit, _ := mu.bucketInit()
|
||||||
|
//if err = mu.list(); err != nil {
|
||||||
|
// return err
|
||||||
|
//}
|
||||||
|
//md5String := fmt.Sprintf("\"%s\"", hex.EncodeToString(mu.hashMd5.Sum(nil)))
|
||||||
|
|
||||||
|
md5String := fmt.Sprintf("\"%x\"", mu.hashMd5.Sum(nil))
|
||||||
|
sort.Sort(mu.objectParts)
|
||||||
|
req := qs.CompleteMultipartUploadInput{
|
||||||
|
UploadID: mu.uploadID,
|
||||||
|
ObjectParts: mu.objectParts,
|
||||||
|
ETag: &md5String,
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Tried to complete a multi-part")
|
||||||
|
_, err = bucketInit.CompleteMultipartUpload(mu.cfg.key, &req)
|
||||||
|
if err == nil {
|
||||||
|
fs.Debugf(mu, "Complete multi-part finished")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// abort abort an multipart upload
|
||||||
|
func (mu *multiUploader) abort() error {
|
||||||
|
var err error
|
||||||
|
bucketInit, _ := mu.bucketInit()
|
||||||
|
|
||||||
|
if uploadID := mu.uploadID; uploadID != nil {
|
||||||
|
req := qs.AbortMultipartUploadInput{
|
||||||
|
UploadID: uploadID,
|
||||||
|
}
|
||||||
|
fs.Debugf(mu, "Tried to abort a multi-part")
|
||||||
|
_, err = bucketInit.AbortMultipartUpload(mu.cfg.key, &req)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// multiPartUpload upload a multiple object into QingStor
|
||||||
|
func (mu *multiUploader) multiPartUpload(firstBuf io.ReadSeeker) error {
|
||||||
|
var err error
|
||||||
|
//Initiate an multi-part upload
|
||||||
|
if err = mu.initiate(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan chunk, mu.cfg.concurrency)
|
||||||
|
for i := 0; i < mu.cfg.concurrency; i++ {
|
||||||
|
mu.wg.Add(1)
|
||||||
|
go mu.readChunk(ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
var partNumber int
|
||||||
|
ch <- chunk{partNumber: partNumber, buffer: firstBuf, size: mu.readerSize}
|
||||||
|
|
||||||
|
for mu.getErr() == nil {
|
||||||
|
partNumber++
|
||||||
|
// This upload exceeded maximum number of supported parts, error now.
|
||||||
|
if partNumber > mu.cfg.maxUploadParts || partNumber > maxMultiParts {
|
||||||
|
var msg string
|
||||||
|
if partNumber > mu.cfg.maxUploadParts {
|
||||||
|
msg = fmt.Sprintf("exceeded total allowed configured maxUploadParts (%d). "+
|
||||||
|
"Adjust PartSize to fit in this limit", mu.cfg.maxUploadParts)
|
||||||
|
} else {
|
||||||
|
msg = fmt.Sprintf("exceeded total allowed QingStor limit maxUploadParts (%d). "+
|
||||||
|
"Adjust PartSize to fit in this limit", maxMultiParts)
|
||||||
|
}
|
||||||
|
mu.setErr(errors.New(msg))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var reader io.ReadSeeker
|
||||||
|
var nextChunkLen int
|
||||||
|
reader, nextChunkLen, err = mu.nextReader()
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if nextChunkLen == 0 && partNumber > 0 {
|
||||||
|
// No need to upload empty part, if file was empty to start
|
||||||
|
// with empty single part would of been created and never
|
||||||
|
// started multipart upload.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
num := partNumber
|
||||||
|
ch <- chunk{partNumber: num, buffer: reader, size: mu.readerSize}
|
||||||
|
}
|
||||||
|
// Wait for all goroutines finish
|
||||||
|
close(ch)
|
||||||
|
mu.wg.Wait()
|
||||||
|
// Complete Multipart Upload
|
||||||
|
err = mu.complete()
|
||||||
|
if mu.getErr() != nil || err != nil {
|
||||||
|
_ = mu.abort()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in a new issue