77e69b9cf3
Signed-off-by: Olivier Gambier <olivier@docker.com>
508 lines
14 KiB
Go
508 lines
14 KiB
Go
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
// Multi represents an unfinished multipart upload.
|
|
//
|
|
// Multipart uploads allow sending big objects in smaller chunks.
|
|
// After all parts have been sent, the upload must be explicitly
|
|
// completed by calling Complete with the list of parts.
|
|
//
|
|
// See http://goo.gl/vJfTG for an overview of multipart uploads.
|
|
type Multi struct {
|
|
Bucket *Bucket
|
|
Key string
|
|
UploadId string
|
|
}
|
|
|
|
// That's the default. Here just for testing.
|
|
var listMultiMax = 1000
|
|
|
|
type listMultiResp struct {
|
|
NextKeyMarker string
|
|
NextUploadIdMarker string
|
|
IsTruncated bool
|
|
Upload []Multi
|
|
CommonPrefixes []string `xml:"CommonPrefixes>Prefix"`
|
|
}
|
|
|
|
// ListMulti returns the list of unfinished multipart uploads in b.
|
|
//
|
|
// The prefix parameter limits the response to keys that begin with the
|
|
// specified prefix. You can use prefixes to separate a bucket into different
|
|
// groupings of keys (to get the feeling of folders, for example).
|
|
//
|
|
// The delim parameter causes the response to group all of the keys that
|
|
// share a common prefix up to the next delimiter in a single entry within
|
|
// the CommonPrefixes field. You can use delimiters to separate a bucket
|
|
// into different groupings of keys, similar to how folders would work.
|
|
//
|
|
// See http://goo.gl/ePioY for details.
|
|
func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
|
|
params := map[string][]string{
|
|
"uploads": {""},
|
|
"max-uploads": {strconv.FormatInt(int64(listMultiMax), 10)},
|
|
"prefix": {prefix},
|
|
"delimiter": {delim},
|
|
}
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
req := &request{
|
|
method: "GET",
|
|
bucket: b.Name,
|
|
params: params,
|
|
}
|
|
var resp listMultiResp
|
|
err := b.S3.query(req, &resp)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
for i := range resp.Upload {
|
|
multi := &resp.Upload[i]
|
|
multi.Bucket = b
|
|
multis = append(multis, multi)
|
|
}
|
|
prefixes = append(prefixes, resp.CommonPrefixes...)
|
|
if !resp.IsTruncated {
|
|
return multis, prefixes, nil
|
|
}
|
|
params["key-marker"] = []string{resp.NextKeyMarker}
|
|
params["upload-id-marker"] = []string{resp.NextUploadIdMarker}
|
|
attempt = attempts.Start() // Last request worked.
|
|
}
|
|
panic("unreachable")
|
|
}
|
|
|
|
// Multi returns a multipart upload handler for the provided key
|
|
// inside b. If a multipart upload exists for key, it is returned,
|
|
// otherwise a new multipart upload is initiated with contType and perm.
|
|
func (b *Bucket) Multi(key, contType string, perm ACL, options Options) (*Multi, error) {
|
|
multis, _, err := b.ListMulti(key, "")
|
|
if err != nil && !hasCode(err, "NoSuchUpload") {
|
|
return nil, err
|
|
}
|
|
for _, m := range multis {
|
|
if m.Key == key {
|
|
return m, nil
|
|
}
|
|
}
|
|
return b.InitMulti(key, contType, perm, options)
|
|
}
|
|
|
|
// InitMulti initializes a new multipart upload at the provided
|
|
// key inside b and returns a value for manipulating it.
|
|
//
|
|
// See http://goo.gl/XP8kL for details.
|
|
func (b *Bucket) InitMulti(key string, contType string, perm ACL, options Options) (*Multi, error) {
|
|
headers := map[string][]string{
|
|
"Content-Type": {contType},
|
|
"Content-Length": {"0"},
|
|
"x-amz-acl": {string(perm)},
|
|
}
|
|
options.addHeaders(headers)
|
|
params := map[string][]string{
|
|
"uploads": {""},
|
|
}
|
|
req := &request{
|
|
method: "POST",
|
|
bucket: b.Name,
|
|
path: key,
|
|
headers: headers,
|
|
params: params,
|
|
}
|
|
var err error
|
|
var resp struct {
|
|
UploadId string `xml:"UploadId"`
|
|
}
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
err = b.S3.query(req, &resp)
|
|
if !shouldRetry(err) {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
|
|
}
|
|
|
|
func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) {
|
|
headers := map[string][]string{
|
|
"x-amz-copy-source": {url.QueryEscape(source)},
|
|
}
|
|
options.addHeaders(headers)
|
|
params := map[string][]string{
|
|
"uploadId": {m.UploadId},
|
|
"partNumber": {strconv.FormatInt(int64(n), 10)},
|
|
}
|
|
|
|
sourceBucket := m.Bucket.S3.Bucket(strings.TrimRight(strings.SplitAfterN(source, "/", 2)[0], "/"))
|
|
sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 2)[1], nil)
|
|
if err != nil {
|
|
return nil, Part{}, err
|
|
}
|
|
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
req := &request{
|
|
method: "PUT",
|
|
bucket: m.Bucket.Name,
|
|
path: m.Key,
|
|
headers: headers,
|
|
params: params,
|
|
}
|
|
resp := &CopyObjectResult{}
|
|
err = m.Bucket.S3.query(req, resp)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, Part{}, err
|
|
}
|
|
if resp.ETag == "" {
|
|
return nil, Part{}, errors.New("part upload succeeded with no ETag")
|
|
}
|
|
return resp, Part{n, resp.ETag, sourceMeta.ContentLength}, nil
|
|
}
|
|
panic("unreachable")
|
|
}
|
|
|
|
// PutPart sends part n of the multipart upload, reading all the content from r.
|
|
// Each part, except for the last one, must be at least 5MB in size.
|
|
//
|
|
// See http://goo.gl/pqZer for details.
|
|
func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
|
|
partSize, _, md5b64, err := seekerInfo(r)
|
|
if err != nil {
|
|
return Part{}, err
|
|
}
|
|
return m.putPart(n, r, partSize, md5b64)
|
|
}
|
|
|
|
func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) {
|
|
headers := map[string][]string{
|
|
"Content-Length": {strconv.FormatInt(partSize, 10)},
|
|
"Content-MD5": {md5b64},
|
|
}
|
|
params := map[string][]string{
|
|
"uploadId": {m.UploadId},
|
|
"partNumber": {strconv.FormatInt(int64(n), 10)},
|
|
}
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
_, err := r.Seek(0, 0)
|
|
if err != nil {
|
|
return Part{}, err
|
|
}
|
|
req := &request{
|
|
method: "PUT",
|
|
bucket: m.Bucket.Name,
|
|
path: m.Key,
|
|
headers: headers,
|
|
params: params,
|
|
payload: r,
|
|
}
|
|
err = m.Bucket.S3.prepare(req)
|
|
if err != nil {
|
|
return Part{}, err
|
|
}
|
|
resp, err := m.Bucket.S3.run(req, nil)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return Part{}, err
|
|
}
|
|
etag := resp.Header.Get("ETag")
|
|
if etag == "" {
|
|
return Part{}, errors.New("part upload succeeded with no ETag")
|
|
}
|
|
return Part{n, etag, partSize}, nil
|
|
}
|
|
panic("unreachable")
|
|
}
|
|
|
|
func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) {
|
|
_, err = r.Seek(0, 0)
|
|
if err != nil {
|
|
return 0, "", "", err
|
|
}
|
|
digest := md5.New()
|
|
size, err = io.Copy(digest, r)
|
|
if err != nil {
|
|
return 0, "", "", err
|
|
}
|
|
sum := digest.Sum(nil)
|
|
md5hex = hex.EncodeToString(sum)
|
|
md5b64 = base64.StdEncoding.EncodeToString(sum)
|
|
return size, md5hex, md5b64, nil
|
|
}
|
|
|
|
type Part struct {
|
|
N int `xml:"PartNumber"`
|
|
ETag string
|
|
Size int64
|
|
}
|
|
|
|
type partSlice []Part
|
|
|
|
func (s partSlice) Len() int { return len(s) }
|
|
func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N }
|
|
func (s partSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
type listPartsResp struct {
|
|
NextPartNumberMarker string
|
|
IsTruncated bool
|
|
Part []Part
|
|
}
|
|
|
|
// That's the default. Here just for testing.
|
|
var listPartsMax = 1000
|
|
|
|
// Kept for backcompatability. See the documentation for ListPartsFull
|
|
func (m *Multi) ListParts() ([]Part, error) {
|
|
return m.ListPartsFull(0, listPartsMax)
|
|
}
|
|
|
|
// ListParts returns the list of previously uploaded parts in m,
|
|
// ordered by part number (Only parts with higher part numbers than
|
|
// partNumberMarker will be listed). Only up to maxParts parts will be
|
|
// returned.
|
|
//
|
|
// See http://goo.gl/ePioY for details.
|
|
func (m *Multi) ListPartsFull(partNumberMarker int, maxParts int) ([]Part, error) {
|
|
if maxParts > listPartsMax {
|
|
maxParts = listPartsMax
|
|
}
|
|
|
|
params := map[string][]string{
|
|
"uploadId": {m.UploadId},
|
|
"max-parts": {strconv.FormatInt(int64(maxParts), 10)},
|
|
"part-number-marker": {strconv.FormatInt(int64(partNumberMarker), 10)},
|
|
}
|
|
var parts partSlice
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
req := &request{
|
|
method: "GET",
|
|
bucket: m.Bucket.Name,
|
|
path: m.Key,
|
|
params: params,
|
|
}
|
|
var resp listPartsResp
|
|
err := m.Bucket.S3.query(req, &resp)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
parts = append(parts, resp.Part...)
|
|
if !resp.IsTruncated {
|
|
sort.Sort(parts)
|
|
return parts, nil
|
|
}
|
|
params["part-number-marker"] = []string{resp.NextPartNumberMarker}
|
|
attempt = attempts.Start() // Last request worked.
|
|
}
|
|
panic("unreachable")
|
|
}
|
|
|
|
type ReaderAtSeeker interface {
|
|
io.ReaderAt
|
|
io.ReadSeeker
|
|
}
|
|
|
|
// PutAll sends all of r via a multipart upload with parts no larger
|
|
// than partSize bytes, which must be set to at least 5MB.
|
|
// Parts previously uploaded are either reused if their checksum
|
|
// and size match the new part, or otherwise overwritten with the
|
|
// new content.
|
|
// PutAll returns all the parts of m (reused or not).
|
|
func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) {
|
|
old, err := m.ListParts()
|
|
if err != nil && !hasCode(err, "NoSuchUpload") {
|
|
return nil, err
|
|
}
|
|
reuse := 0 // Index of next old part to consider reusing.
|
|
current := 1 // Part number of latest good part handled.
|
|
totalSize, err := r.Seek(0, 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
first := true // Must send at least one empty part if the file is empty.
|
|
var result []Part
|
|
NextSection:
|
|
for offset := int64(0); offset < totalSize || first; offset += partSize {
|
|
first = false
|
|
if offset+partSize > totalSize {
|
|
partSize = totalSize - offset
|
|
}
|
|
section := io.NewSectionReader(r, offset, partSize)
|
|
_, md5hex, md5b64, err := seekerInfo(section)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for reuse < len(old) && old[reuse].N <= current {
|
|
// Looks like this part was already sent.
|
|
part := &old[reuse]
|
|
etag := `"` + md5hex + `"`
|
|
if part.N == current && part.Size == partSize && part.ETag == etag {
|
|
// Checksum matches. Reuse the old part.
|
|
result = append(result, *part)
|
|
current++
|
|
continue NextSection
|
|
}
|
|
reuse++
|
|
}
|
|
|
|
// Part wasn't found or doesn't match. Send it.
|
|
part, err := m.putPart(current, section, partSize, md5b64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, part)
|
|
current++
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
type completeUpload struct {
|
|
XMLName xml.Name `xml:"CompleteMultipartUpload"`
|
|
Parts completeParts `xml:"Part"`
|
|
}
|
|
|
|
type completePart struct {
|
|
PartNumber int
|
|
ETag string
|
|
}
|
|
|
|
type completeParts []completePart
|
|
|
|
func (p completeParts) Len() int { return len(p) }
|
|
func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
|
|
func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
// We can't know in advance whether we'll have an Error or a
|
|
// CompleteMultipartUploadResult, so this structure is just a placeholder to
|
|
// know the name of the XML object.
|
|
type completeUploadResp struct {
|
|
XMLName xml.Name
|
|
InnerXML string `xml:",innerxml"`
|
|
}
|
|
|
|
// Complete assembles the given previously uploaded parts into the
|
|
// final object. This operation may take several minutes.
|
|
//
|
|
// See http://goo.gl/2Z7Tw for details.
|
|
func (m *Multi) Complete(parts []Part) error {
|
|
params := map[string][]string{
|
|
"uploadId": {m.UploadId},
|
|
}
|
|
c := completeUpload{}
|
|
for _, p := range parts {
|
|
c.Parts = append(c.Parts, completePart{p.N, p.ETag})
|
|
}
|
|
sort.Sort(c.Parts)
|
|
data, err := xml.Marshal(&c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
req := &request{
|
|
method: "POST",
|
|
bucket: m.Bucket.Name,
|
|
path: m.Key,
|
|
params: params,
|
|
payload: bytes.NewReader(data),
|
|
}
|
|
var resp completeUploadResp
|
|
if m.Bucket.Region.Name == "generic" {
|
|
headers := make(http.Header)
|
|
headers.Add("Content-Length", strconv.FormatInt(int64(len(data)), 10))
|
|
req.headers = headers
|
|
}
|
|
err := m.Bucket.S3.query(req, &resp)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// A 200 error code does not guarantee that there were no errors (see
|
|
// http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html ),
|
|
// so first figure out what kind of XML "object" we are dealing with.
|
|
|
|
if resp.XMLName.Local == "Error" {
|
|
// S3.query does the unmarshalling for us, so we can't unmarshal
|
|
// again in a different struct... So we need to duct-tape back the
|
|
// original XML back together.
|
|
fullErrorXml := "<Error>" + resp.InnerXML + "</Error>"
|
|
s3err := &Error{}
|
|
|
|
if err := xml.Unmarshal([]byte(fullErrorXml), s3err); err != nil {
|
|
return err
|
|
}
|
|
|
|
return s3err
|
|
}
|
|
|
|
if resp.XMLName.Local == "CompleteMultipartUploadResult" {
|
|
// FIXME: One could probably add a CompleteFull method returning the
|
|
// actual contents of the CompleteMultipartUploadResult object.
|
|
return nil
|
|
}
|
|
|
|
return errors.New("Invalid XML struct returned: " + resp.XMLName.Local)
|
|
}
|
|
panic("unreachable")
|
|
}
|
|
|
|
// Abort deletes an unifinished multipart upload and any previously
|
|
// uploaded parts for it.
|
|
//
|
|
// After a multipart upload is aborted, no additional parts can be
|
|
// uploaded using it. However, if any part uploads are currently in
|
|
// progress, those part uploads might or might not succeed. As a result,
|
|
// it might be necessary to abort a given multipart upload multiple
|
|
// times in order to completely free all storage consumed by all parts.
|
|
//
|
|
// NOTE: If the described scenario happens to you, please report back to
|
|
// the goamz authors with details. In the future such retrying should be
|
|
// handled internally, but it's not clear what happens precisely (Is an
|
|
// error returned? Is the issue completely undetectable?).
|
|
//
|
|
// See http://goo.gl/dnyJw for details.
|
|
func (m *Multi) Abort() error {
|
|
params := map[string][]string{
|
|
"uploadId": {m.UploadId},
|
|
}
|
|
for attempt := attempts.Start(); attempt.Next(); {
|
|
req := &request{
|
|
method: "DELETE",
|
|
bucket: m.Bucket.Name,
|
|
path: m.Key,
|
|
params: params,
|
|
}
|
|
err := m.Bucket.S3.query(req, nil)
|
|
if shouldRetry(err) && attempt.HasNext() {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
panic("unreachable")
|
|
}
|