Update s3 library again

This commit is contained in:
Alexander Neumann 2016-01-02 14:37:08 +01:00
parent a850041cf0
commit 0237b0d972
14 changed files with 505 additions and 147 deletions

4
Godeps/Godeps.json generated
View file

@ -24,8 +24,8 @@
}, },
{ {
"ImportPath": "github.com/minio/minio-go", "ImportPath": "github.com/minio/minio-go",
"Comment": "v0.2.5-187-gad1597d", "Comment": "v0.2.5-195-gf30b6ca",
"Rev": "ad1597d864f56f608f8a1694ae9b5970fef57eb6" "Rev": "f30b6ca90bfda7578f6a11b7ba6af2eae7b0510c"
}, },
{ {
"ImportPath": "github.com/pkg/sftp", "ImportPath": "github.com/pkg/sftp",

View file

@ -3,7 +3,7 @@ go:
- 1.5.1 - 1.5.1
script: script:
- go vet ./... - go vet ./...
- go test -race -v ./... - go test -test.short -race -v ./...
notifications: notifications:
slack: slack:
secure: HrOX2k6F/sEl6Rr4m5vHOdJCIwV42be0kz1Jy/WSMvrl/fQ8YkldKviLeWh4aWt1kclsYhNQ4FqGML+RIZYsdOqej4fAw9Vi5pZkI1MzPJq0UjrtMqkqzvD90eDGQYCKwaXjEIN8cohwJeb6X0B0HKAd9sqJW5GH5SwnhH5WWP8= secure: HrOX2k6F/sEl6Rr4m5vHOdJCIwV42be0kz1Jy/WSMvrl/fQ8YkldKviLeWh4aWt1kclsYhNQ4FqGML+RIZYsdOqej4fAw9Vi5pZkI1MzPJq0UjrtMqkqzvD90eDGQYCKwaXjEIN8cohwJeb6X0B0HKAd9sqJW5GH5SwnhH5WWP8=

View file

@ -71,20 +71,20 @@ type ObjectMultipartStat struct {
Err error Err error
} }
// partMetadata - container for each partMetadata. // partData - container for each part.
type partMetadata struct { type partData struct {
MD5Sum []byte MD5Sum []byte
Sha256Sum []byte Sha256Sum []byte
ReadCloser io.ReadCloser ReadCloser io.ReadCloser
Size int64 Size int64
Number int // partMetadata number. Number int // partData number.
// Error // Error
Err error Err error
} }
// putObjectMetadata - container for each single PUT operation. // putObjectData - container for each single PUT operation.
type putObjectMetadata struct { type putObjectData struct {
MD5Sum []byte MD5Sum []byte
Sha256Sum []byte Sha256Sum []byte
ReadCloser io.ReadCloser ReadCloser io.ReadCloser

View file

@ -218,6 +218,16 @@ func ErrInvalidObjectName(message string) error {
} }
} }
// ErrInvalidParts - invalid number of parts.
func ErrInvalidParts(expectedParts, uploadedParts int) error {
msg := fmt.Sprintf("Unexpected number of parts found Want %d, Got %d", expectedParts, uploadedParts)
return ErrorResponse{
Code: "InvalidParts",
Message: msg,
RequestID: "minio",
}
}
// ErrInvalidObjectPrefix - invalid object prefix response is // ErrInvalidObjectPrefix - invalid object prefix response is
// similar to object name response. // similar to object name response.
var ErrInvalidObjectPrefix = ErrInvalidObjectName var ErrInvalidObjectPrefix = ErrInvalidObjectName

View file

@ -93,7 +93,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. // NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. // Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers.
if isGoogleEndpoint(c.endpointURL) { if isGoogleEndpoint(c.endpointURL) {
if fileSize <= -1 || fileSize > int64(maxSinglePutObjectSize) { if fileSize > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{ return 0, ErrorResponse{
Code: "NotImplemented", Code: "NotImplemented",
Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", fileSize), Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", fileSize),
@ -108,7 +108,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// NOTE: S3 doesn't allow anonymous multipart requests. // NOTE: S3 doesn't allow anonymous multipart requests.
if isAmazonEndpoint(c.endpointURL) && c.anonymous { if isAmazonEndpoint(c.endpointURL) && c.anonymous {
if fileSize <= -1 || fileSize > int64(maxSinglePutObjectSize) { if fileSize > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{ return 0, ErrorResponse{
Code: "NotImplemented", Code: "NotImplemented",
Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", fileSize), Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", fileSize),
@ -121,14 +121,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
return n, err return n, err
} }
// Large file upload is initiated for uploads for input data size // Small object upload is initiated for uploads for input data size smaller than 5MiB.
// if its greater than 5MiB or data size is negative. if fileSize < minimumPartSize {
if fileSize >= minimumPartSize || fileSize < 0 { return c.putSmallObject(bucketName, objectName, fileData, fileSize, contentType)
n, err := c.fputLargeObject(bucketName, objectName, fileData, fileSize, contentType)
return n, err
} }
n, err := c.putSmallObject(bucketName, objectName, fileData, fileSize, contentType) return c.fputLargeObject(bucketName, objectName, fileData, fileSize, contentType)
return n, err
} }
// computeHash - calculates MD5 and Sha256 for an input read Seeker. // computeHash - calculates MD5 and Sha256 for an input read Seeker.
@ -192,7 +189,6 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File
var prevMaxPartSize int64 var prevMaxPartSize int64
// Loop through all parts and calculate totalUploadedSize. // Loop through all parts and calculate totalUploadedSize.
for _, partInfo := range partsInfo { for _, partInfo := range partsInfo {
totalUploadedSize += partInfo.Size
// Choose the maximum part size. // Choose the maximum part size.
if partInfo.Size >= prevMaxPartSize { if partInfo.Size >= prevMaxPartSize {
prevMaxPartSize = partInfo.Size prevMaxPartSize = partInfo.Size
@ -206,11 +202,14 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File
partSize = prevMaxPartSize partSize = prevMaxPartSize
} }
// Part number always starts with '1'. // Part number always starts with '0'.
partNumber := 1 partNumber := 0
// Loop through until EOF. // Loop through until EOF.
for totalUploadedSize < fileSize { for totalUploadedSize < fileSize {
// Increment part number.
partNumber++
// Get a section reader on a particular offset. // Get a section reader on a particular offset.
sectionReader := io.NewSectionReader(fileData, totalUploadedSize, partSize) sectionReader := io.NewSectionReader(fileData, totalUploadedSize, partSize)
@ -221,7 +220,7 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File
} }
// Save all the part metadata. // Save all the part metadata.
partMdata := partMetadata{ prtData := partData{
ReadCloser: ioutil.NopCloser(sectionReader), ReadCloser: ioutil.NopCloser(sectionReader),
Size: size, Size: size,
MD5Sum: md5Sum, MD5Sum: md5Sum,
@ -229,31 +228,26 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File
Number: partNumber, // Part number to be uploaded. Number: partNumber, // Part number to be uploaded.
} }
// If part number already uploaded, move to the next one. // If part not uploaded proceed to upload.
if isPartUploaded(objectPart{ if !isPartUploaded(objectPart{
ETag: hex.EncodeToString(partMdata.MD5Sum), ETag: hex.EncodeToString(prtData.MD5Sum),
PartNumber: partMdata.Number, PartNumber: prtData.Number,
}, partsInfo) { }, partsInfo) {
// Close the read closer.
partMdata.ReadCloser.Close()
continue
}
// Upload the part. // Upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData)
if err != nil { if err != nil {
partMdata.ReadCloser.Close() prtData.ReadCloser.Close()
return totalUploadedSize, err return totalUploadedSize, err
} }
// Save successfully uploaded part metadata.
partsInfo[prtData.Number] = objPart
}
// Close the read closer for temporary file.
prtData.ReadCloser.Close()
// Save successfully uploaded size. // Save successfully uploaded size.
totalUploadedSize += partMdata.Size totalUploadedSize += prtData.Size
// Save successfully uploaded part metadata.
partsInfo[partMdata.Number] = objPart
// Increment to next part number.
partNumber++
} }
// if totalUploadedSize is different than the file 'size'. Do not complete the request throw an error. // if totalUploadedSize is different than the file 'size'. Do not complete the request throw an error.
@ -269,6 +263,11 @@ func (c Client) fputLargeObject(bucketName, objectName string, fileData *os.File
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
} }
// If partNumber is different than total list of parts, error out.
if partNumber != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
}
// Sort all completed parts. // Sort all completed parts.
sort.Sort(completedParts(completeMultipartUpload.Parts)) sort.Sort(completedParts(completeMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload) _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload)

View file

@ -365,9 +365,12 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
return nil, ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) return nil, ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
} }
} }
// trim off the odd double quotes.
md5sum := strings.Trim(resp.Header.Get("ETag"), "\"") // Trim off the odd double quotes from ETag in the beginning and end.
// parse the date. md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
md5sum = strings.TrimSuffix(md5sum, "\"")
// Parse the date.
date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified")) date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified"))
if err != nil { if err != nil {
msg := "Last-Modified time format not recognized. " + reportIssue msg := "Last-Modified time format not recognized. " + reportIssue
@ -379,6 +382,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
AmzBucketRegion: resp.Header.Get("x-amz-bucket-region"), AmzBucketRegion: resp.Header.Get("x-amz-bucket-region"),
} }
} }
// Get content-type.
contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
if contentType == "" { if contentType == "" {
contentType = "application/octet-stream" contentType = "application/octet-stream"

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"strings"
) )
// ListBuckets list all buckets owned by this authenticated user. // ListBuckets list all buckets owned by this authenticated user.
@ -393,6 +394,9 @@ func (c Client) listObjectParts(bucketName, objectName, uploadID string) (partsI
} }
// Append to parts info. // Append to parts info.
for _, part := range listObjPartsResult.ObjectParts { for _, part := range listObjPartsResult.ObjectParts {
// Trim off the odd double quotes from ETag in the beginning and end.
part.ETag = strings.TrimPrefix(part.ETag, "\"")
part.ETag = strings.TrimSuffix(part.ETag, "\"")
partsInfo[part.PartNumber] = part partsInfo[part.PartNumber] = part
} }
// Keep part number marker, for the next iteration. // Keep part number marker, for the next iteration.

View file

@ -17,11 +17,14 @@
package minio package minio
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt"
"hash" "hash"
"io" "io"
"io/ioutil"
"sort" "sort"
) )
@ -34,9 +37,187 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose
if err := isValidObjectName(objectName); err != nil { if err := isValidObjectName(objectName); err != nil {
return 0, err return 0, err
} }
// Input size negative should return error.
if size < 0 {
return 0, ErrInvalidArgument("Input file size cannot be negative.")
}
// Input size bigger than 5TiB should fail.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrInvalidArgument("Input file size is bigger than the supported maximum of 5TiB.")
}
// Cleanup any previously left stale files, as the function exits. // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
defer cleanupStaleTempfiles("multiparts$-putobject-partial") // So we fall back to single PUT operation with the maximum limit of 5GiB.
if isGoogleEndpoint(c.endpointURL) {
if size > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", size),
Key: objectName,
BucketName: bucketName,
}
}
// Do not compute MD5 for Google Cloud Storage. Uploads upto 5GiB in size.
n, err := c.putPartialNoChksum(bucketName, objectName, data, size, contentType)
return n, err
}
// NOTE: S3 doesn't allow anonymous multipart requests.
if isAmazonEndpoint(c.endpointURL) && c.anonymous {
if size > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", size),
Key: objectName,
BucketName: bucketName,
}
}
// Do not compute MD5 for anonymous requests to Amazon S3. Uploads upto 5GiB in size.
n, err := c.putPartialAnonymous(bucketName, objectName, data, size, contentType)
return n, err
}
// Small file upload is initiated for uploads for input data size smaller than 5MiB.
if size < minimumPartSize {
n, err = c.putPartialSmallObject(bucketName, objectName, data, size, contentType)
return n, err
}
n, err = c.putPartialLargeObject(bucketName, objectName, data, size, contentType)
return n, err
}
// putNoChecksumPartial special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c Client) putPartialNoChksum(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
if size > maxPartSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
}
// Create a new pipe to stage the reads.
reader, writer := io.Pipe()
// readAtOffset to carry future offsets.
var readAtOffset int64
// readAt defaults to reading at 5MiB buffer.
readAtBuffer := make([]byte, 1024*1024*5)
// Initiate a routine to start writing.
go func() {
for {
readAtSize, rerr := data.ReadAt(readAtBuffer, readAtOffset)
if rerr != nil {
if rerr != io.EOF {
writer.CloseWithError(rerr)
return
}
}
writeSize, werr := writer.Write(readAtBuffer[:readAtSize])
if werr != nil {
writer.CloseWithError(werr)
return
}
if readAtSize != writeSize {
writer.CloseWithError(errors.New("Something really bad happened here. " + reportIssue))
return
}
readAtOffset += int64(writeSize)
if rerr == io.EOF {
writer.Close()
return
}
}
}()
// For anonymous requests, we will not calculate sha256 and md5sum.
putObjData := putObjectData{
MD5Sum: nil,
Sha256Sum: nil,
ReadCloser: reader,
Size: size,
ContentType: contentType,
}
// Execute put object.
st, err := c.putObject(bucketName, objectName, putObjData)
if err != nil {
return 0, err
}
if st.Size != size {
return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName)
}
return size, nil
}
// putAnonymousPartial is a special function for uploading content as anonymous request.
// This special function is necessary since Amazon S3 doesn't allow anonymous multipart uploads.
func (c Client) putPartialAnonymous(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
return c.putPartialNoChksum(bucketName, objectName, data, size, contentType)
}
// putSmallObjectPartial uploads files smaller than 5MiB.
func (c Client) putPartialSmallObject(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
// readAt defaults to reading at 5MiB buffer.
readAtBuffer := make([]byte, size)
readAtSize, err := data.ReadAt(readAtBuffer, 0)
if err != nil {
if err != io.EOF {
return 0, err
}
}
if int64(readAtSize) != size {
return 0, ErrUnexpectedEOF(int64(readAtSize), size, bucketName, objectName)
}
// Construct a new PUT object metadata.
putObjData := putObjectData{
MD5Sum: sumMD5(readAtBuffer),
Sha256Sum: sum256(readAtBuffer),
ReadCloser: ioutil.NopCloser(bytes.NewReader(readAtBuffer)),
Size: size,
ContentType: contentType,
}
// Single part use case, use putObject directly.
st, err := c.putObject(bucketName, objectName, putObjData)
if err != nil {
return 0, err
}
if st.Size != size {
return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName)
}
return size, nil
}
// putPartialLargeObject uploads files bigger than 5MiB.
func (c Client) putPartialLargeObject(bucketName, objectName string, data ReadAtCloser, size int64, contentType string) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
}
if err := isValidObjectName(objectName); err != nil {
return 0, err
}
// getUploadID for an object, initiates a new multipart request // getUploadID for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object. // if it cannot find any previously partially uploaded object.
@ -139,7 +320,7 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose
} }
// Save all the part metadata. // Save all the part metadata.
partMdata := partMetadata{ prtData := partData{
ReadCloser: tmpFile, ReadCloser: tmpFile,
MD5Sum: hashMD5.Sum(nil), MD5Sum: hashMD5.Sum(nil),
Size: totalReadPartSize, Size: totalReadPartSize,
@ -147,25 +328,25 @@ func (c Client) PutObjectPartial(bucketName, objectName string, data ReadAtClose
// Signature version '4'. // Signature version '4'.
if c.signature.isV4() { if c.signature.isV4() {
partMdata.Sha256Sum = hashSha256.Sum(nil) prtData.Sha256Sum = hashSha256.Sum(nil)
} }
// Current part number to be uploaded. // Current part number to be uploaded.
partMdata.Number = partNumber prtData.Number = partNumber
// execute upload part. // execute upload part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData)
if err != nil { if err != nil {
// Close the read closer. // Close the read closer.
partMdata.ReadCloser.Close() prtData.ReadCloser.Close()
return totalUploadedSize, err return totalUploadedSize, err
} }
// Save successfully uploaded size. // Save successfully uploaded size.
totalUploadedSize += partMdata.Size totalUploadedSize += prtData.Size
// Save successfully uploaded part metadata. // Save successfully uploaded part metadata.
partsInfo[partMdata.Number] = objPart partsInfo[prtData.Number] = objPart
// Move to next part. // Move to next part.
partNumber++ partNumber++

View file

@ -50,8 +50,8 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// - For size input as -1 PutObject does a multipart Put operation until input stream reaches EOF. // - For size input as -1 PutObject does a multipart Put operation until input stream reaches EOF.
// Maximum object size that can be uploaded through this operation will be 5TiB. // Maximum object size that can be uploaded through this operation will be 5TiB.
// //
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. // So we fall back to single PUT operation with the maximum limit of 5GiB.
// //
// NOTE: For anonymous requests Amazon S3 doesn't allow multipart upload. So we fall back to single PUT operation. // NOTE: For anonymous requests Amazon S3 doesn't allow multipart upload. So we fall back to single PUT operation.
func (c Client) PutObject(bucketName, objectName string, data io.Reader, size int64, contentType string) (n int64, err error) { func (c Client) PutObject(bucketName, objectName string, data io.Reader, size int64, contentType string) (n int64, err error) {
@ -63,8 +63,8 @@ func (c Client) PutObject(bucketName, objectName string, data io.Reader, size in
return 0, err return 0, err
} }
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. // So we fall back to single PUT operation with the maximum limit of 5GiB.
if isGoogleEndpoint(c.endpointURL) { if isGoogleEndpoint(c.endpointURL) {
if size <= -1 { if size <= -1 {
return 0, ErrorResponse{ return 0, ErrorResponse{
@ -114,7 +114,7 @@ func (c Client) putNoChecksum(bucketName, objectName string, data io.Reader, siz
return 0, ErrEntityTooLarge(size, bucketName, objectName) return 0, ErrEntityTooLarge(size, bucketName, objectName)
} }
// For anonymous requests, we will not calculate sha256 and md5sum. // For anonymous requests, we will not calculate sha256 and md5sum.
putObjMetadata := putObjectMetadata{ putObjData := putObjectData{
MD5Sum: nil, MD5Sum: nil,
Sha256Sum: nil, Sha256Sum: nil,
ReadCloser: ioutil.NopCloser(data), ReadCloser: ioutil.NopCloser(data),
@ -122,9 +122,13 @@ func (c Client) putNoChecksum(bucketName, objectName string, data io.Reader, siz
ContentType: contentType, ContentType: contentType,
} }
// Execute put object. // Execute put object.
if _, err := c.putObject(bucketName, objectName, putObjMetadata); err != nil { st, err := c.putObject(bucketName, objectName, putObjData)
if err != nil {
return 0, err return 0, err
} }
if st.Size != size {
return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName)
}
return size, nil return size, nil
} }
@ -160,7 +164,7 @@ func (c Client) putSmallObject(bucketName, objectName string, data io.Reader, si
return 0, ErrUnexpectedEOF(int64(len(dataBytes)), size, bucketName, objectName) return 0, ErrUnexpectedEOF(int64(len(dataBytes)), size, bucketName, objectName)
} }
// Construct a new PUT object metadata. // Construct a new PUT object metadata.
putObjMetadata := putObjectMetadata{ putObjData := putObjectData{
MD5Sum: sumMD5(dataBytes), MD5Sum: sumMD5(dataBytes),
Sha256Sum: sum256(dataBytes), Sha256Sum: sum256(dataBytes),
ReadCloser: ioutil.NopCloser(bytes.NewReader(dataBytes)), ReadCloser: ioutil.NopCloser(bytes.NewReader(dataBytes)),
@ -168,9 +172,13 @@ func (c Client) putSmallObject(bucketName, objectName string, data io.Reader, si
ContentType: contentType, ContentType: contentType,
} }
// Single part use case, use putObject directly. // Single part use case, use putObject directly.
if _, err := c.putObject(bucketName, objectName, putObjMetadata); err != nil { st, err := c.putObject(bucketName, objectName, putObjData)
if err != nil {
return 0, err return 0, err
} }
if st.Size != size {
return 0, ErrUnexpectedEOF(st.Size, size, bucketName, objectName)
}
return size, nil return size, nil
} }
@ -189,12 +197,13 @@ func (c Client) hashCopy(writer io.ReadWriteSeeker, data io.Reader, partSize int
// Copies to input at writer. // Copies to input at writer.
size, err = io.CopyN(hashWriter, data, partSize) size, err = io.CopyN(hashWriter, data, partSize)
if err != nil { if err != nil {
// If not EOF return error right here.
if err != io.EOF { if err != io.EOF {
return nil, nil, 0, err return nil, nil, 0, err
} }
} }
// Seek back to beginning of input. // Seek back to beginning of input, any error fail right here.
if _, err := writer.Seek(0, 0); err != nil { if _, err := writer.Seek(0, 0); err != nil {
return nil, nil, 0, err return nil, nil, 0, err
} }
@ -204,7 +213,7 @@ func (c Client) hashCopy(writer io.ReadWriteSeeker, data io.Reader, partSize int
if c.signature.isV4() { if c.signature.isV4() {
sha256Sum = hashSha256.Sum(nil) sha256Sum = hashSha256.Sum(nil)
} }
return md5Sum, sha256Sum, size, nil return md5Sum, sha256Sum, size, err
} }
// putLargeObject uploads files bigger than 5 mega bytes. // putLargeObject uploads files bigger than 5 mega bytes.
@ -217,9 +226,6 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
return 0, err return 0, err
} }
// Cleanup any previously left stale files, as the function exits.
defer cleanupStaleTempfiles("multiparts$-putobject")
// getUploadID for an object, initiates a new multipart request // getUploadID for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object. // if it cannot find any previously partially uploaded object.
uploadID, err := c.getUploadID(bucketName, objectName, contentType) uploadID, err := c.getUploadID(bucketName, objectName, contentType)
@ -242,7 +248,6 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
var prevMaxPartSize int64 var prevMaxPartSize int64
// Loop through all parts and calculate totalUploadedSize. // Loop through all parts and calculate totalUploadedSize.
for _, partInfo := range partsInfo { for _, partInfo := range partsInfo {
totalUploadedSize += partInfo.Size
// Choose the maximum part size. // Choose the maximum part size.
if partInfo.Size >= prevMaxPartSize { if partInfo.Size >= prevMaxPartSize {
prevMaxPartSize = partInfo.Size prevMaxPartSize = partInfo.Size
@ -256,15 +261,13 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
partSize = prevMaxPartSize partSize = prevMaxPartSize
} }
// Part number always starts with '1'. // Part number always starts with '0'.
partNumber := 1 partNumber := 0
// Loop through until EOF. // Loop through until EOF.
for { for {
// We have reached EOF, break out. // Increment part number.
if totalUploadedSize == size { partNumber++
break
}
// Initialize a new temporary file. // Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject") tmpFile, err := newTempFile("multiparts$-putobject")
@ -273,15 +276,15 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
} }
// Calculates MD5 and Sha256 sum while copying partSize bytes into tmpFile. // Calculates MD5 and Sha256 sum while copying partSize bytes into tmpFile.
md5Sum, sha256Sum, size, err := c.hashCopy(tmpFile, data, partSize) md5Sum, sha256Sum, size, rErr := c.hashCopy(tmpFile, data, partSize)
if err != nil { if rErr != nil {
if err != io.EOF { if rErr != io.EOF {
return 0, err return 0, rErr
} }
} }
// Save all the part metadata. // Save all the part metadata.
partMdata := partMetadata{ prtData := partData{
ReadCloser: tmpFile, ReadCloser: tmpFile,
Size: size, Size: size,
MD5Sum: md5Sum, MD5Sum: md5Sum,
@ -289,39 +292,28 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
Number: partNumber, // Current part number to be uploaded. Number: partNumber, // Current part number to be uploaded.
} }
// If part number already uploaded, move to the next one. // If part not uploaded proceed to upload.
if isPartUploaded(objectPart{ if !isPartUploaded(objectPart{
ETag: hex.EncodeToString(partMdata.MD5Sum), ETag: hex.EncodeToString(prtData.MD5Sum),
PartNumber: partNumber, PartNumber: partNumber,
}, partsInfo) { }, partsInfo) {
// Close the read closer.
partMdata.ReadCloser.Close()
continue
}
// execute upload part. // execute upload part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, partMdata) objPart, err := c.uploadPart(bucketName, objectName, uploadID, prtData)
if err != nil { if err != nil {
// Close the read closer. // Close the read closer.
partMdata.ReadCloser.Close() prtData.ReadCloser.Close()
return totalUploadedSize, err return 0, err
} }
// Save successfully uploaded size.
totalUploadedSize += partMdata.Size
// Save successfully uploaded part metadata. // Save successfully uploaded part metadata.
partsInfo[partMdata.Number] = objPart partsInfo[prtData.Number] = objPart
// Move to next part.
partNumber++
} }
// If size is greater than zero verify totalWritten. // Close the read closer.
// if totalWritten is different than the input 'size', do not complete the request throw an error. prtData.ReadCloser.Close()
if size > 0 {
if totalUploadedSize != size { // If read error was an EOF, break out of the loop.
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) if rErr == io.EOF {
break
} }
} }
@ -331,6 +323,21 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
complPart.ETag = part.ETag complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber complPart.PartNumber = part.PartNumber
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
// Save successfully uploaded size.
totalUploadedSize += part.Size
}
// If size is greater than zero verify totalUploadedSize. if totalUploadedSize is
// different than the input 'size', do not complete the request throw an error.
if size > 0 {
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
}
// If partNumber is different than total list of parts, error out.
if partNumber != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
} }
// Sort all completed parts. // Sort all completed parts.
@ -346,7 +353,7 @@ func (c Client) putLargeObject(bucketName, objectName string, data io.Reader, si
// putObject - add an object to a bucket. // putObject - add an object to a bucket.
// NOTE: You must have WRITE permissions on a bucket to add an object to it. // NOTE: You must have WRITE permissions on a bucket to add an object to it.
func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjectMetadata) (ObjectStat, error) { func (c Client) putObject(bucketName, objectName string, putObjData putObjectData) (ObjectStat, error) {
// Input validation. // Input validation.
if err := isValidBucketName(bucketName); err != nil { if err := isValidBucketName(bucketName); err != nil {
return ObjectStat{}, err return ObjectStat{}, err
@ -355,23 +362,23 @@ func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjec
return ObjectStat{}, err return ObjectStat{}, err
} }
if strings.TrimSpace(putObjMetadata.ContentType) == "" { if strings.TrimSpace(putObjData.ContentType) == "" {
putObjMetadata.ContentType = "application/octet-stream" putObjData.ContentType = "application/octet-stream"
} }
// Set headers. // Set headers.
customHeader := make(http.Header) customHeader := make(http.Header)
customHeader.Set("Content-Type", putObjMetadata.ContentType) customHeader.Set("Content-Type", putObjData.ContentType)
// Populate request metadata. // Populate request metadata.
reqMetadata := requestMetadata{ reqMetadata := requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
customHeader: customHeader, customHeader: customHeader,
contentBody: putObjMetadata.ReadCloser, contentBody: putObjData.ReadCloser,
contentLength: putObjMetadata.Size, contentLength: putObjData.Size,
contentSha256Bytes: putObjMetadata.Sha256Sum, contentSha256Bytes: putObjData.Sha256Sum,
contentMD5Bytes: putObjMetadata.MD5Sum, contentMD5Bytes: putObjData.MD5Sum,
} }
// Initiate new request. // Initiate new request.
req, err := c.newRequest("PUT", reqMetadata) req, err := c.newRequest("PUT", reqMetadata)
@ -389,11 +396,15 @@ func (c Client) putObject(bucketName, objectName string, putObjMetadata putObjec
return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
} }
} }
var metadata ObjectStat var metadata ObjectStat
// Trim off the odd double quotes from ETag. // Trim off the odd double quotes from ETag in the beginning and end.
metadata.ETag = strings.Trim(resp.Header.Get("ETag"), "\"") metadata.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
metadata.ETag = strings.TrimSuffix(metadata.ETag, "\"")
// A success here means data was written to server successfully. // A success here means data was written to server successfully.
metadata.Size = putObjMetadata.Size metadata.Size = putObjData.Size
// Return here.
return metadata, nil return metadata, nil
} }
@ -452,7 +463,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName, contentType stri
} }
// uploadPart uploads a part in a multipart upload. // uploadPart uploads a part in a multipart upload.
func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPart partMetadata) (objectPart, error) { func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPart partData) (objectPart, error) {
// Input validation. // Input validation.
if err := isValidBucketName(bucketName); err != nil { if err := isValidBucketName(bucketName); err != nil {
return objectPart{}, err return objectPart{}, err
@ -496,8 +507,11 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, uploadingPar
} }
// Once successfully uploaded, return completed part. // Once successfully uploaded, return completed part.
objPart := objectPart{} objPart := objectPart{}
objPart.Size = uploadingPart.Size
objPart.PartNumber = uploadingPart.Number objPart.PartNumber = uploadingPart.Number
objPart.ETag = resp.Header.Get("ETag") // Trim off the odd double quotes from ETag in the beginning and end.
objPart.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
objPart.ETag = strings.TrimSuffix(objPart.ETag, "\"")
return objPart, nil return objPart, nil
} }

View file

@ -103,9 +103,6 @@ type objectPart struct {
// Size of the uploaded part data. // Size of the uploaded part data.
Size int64 Size int64
// Error
Err error
} }
// listObjectPartsResult container for ListObjectParts response. // listObjectPartsResult container for ListObjectParts response.

View file

@ -73,7 +73,12 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectStat, error) {
return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName) return ObjectStat{}, HTTPRespToErrorResponse(resp, bucketName, objectName)
} }
} }
md5sum := strings.Trim(resp.Header.Get("ETag"), "\"") // trim off the odd double quotes
// Trim off the odd double quotes from ETag in the beginning and end.
md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
md5sum = strings.TrimSuffix(md5sum, "\"")
// Parse content length.
size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil { if err != nil {
return ObjectStat{}, ErrorResponse{ return ObjectStat{}, ErrorResponse{

View file

@ -54,6 +54,142 @@ func randString(n int, src rand.Source) string {
return string(b[0:30]) return string(b[0:30])
} }
func TestResumableFPutObject(t *testing.T) {
if testing.Short() {
t.Skip("skipping resumable tests with short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"play.minio.io:9002",
"Q3AM3UQ867SPQQA43P2F",
"zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(nil)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
file, err := ioutil.TempFile(os.TempDir(), "resumable")
if err != nil {
t.Fatal("Error:", err)
}
n, _ := io.CopyN(file, crand.Reader, 11*1024*1024)
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
objectName := bucketName + "-resumable"
n, err = c.FPutObject(bucketName, objectName, file.Name(), "application/octet-stream")
if err != nil {
t.Fatal("Error:", err)
}
if n != int64(11*1024*1024) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", 11*1024*1024, n)
}
// Close the file pro-actively for windows.
file.Close()
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
err = os.Remove(file.Name())
if err != nil {
t.Fatal("Error:", err)
}
}
func TestResumablePutObject(t *testing.T) {
if testing.Short() {
t.Skip("skipping resumable tests with short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"play.minio.io:9002",
"Q3AM3UQ867SPQQA43P2F",
"zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Enable tracing, write to stdout.
// c.TraceOn(nil)
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// generate 11MB
buf := make([]byte, 11*1024*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
objectName := bucketName + "-resumable"
reader := bytes.NewReader(buf)
n, err := c.PutObject(bucketName, objectName, reader, int64(reader.Len()), "application/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
func TestGetObjectPartialFunctional(t *testing.T) { func TestGetObjectPartialFunctional(t *testing.T) {
// Seed random based on current time. // Seed random based on current time.
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
@ -177,6 +313,14 @@ func TestGetObjectPartialFunctional(t *testing.T) {
t.Fatal("Error:", err, len(buf6)) t.Fatal("Error:", err, len(buf6))
} }
} }
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
} }
func TestFunctional(t *testing.T) { func TestFunctional(t *testing.T) {
@ -271,14 +415,26 @@ func TestFunctional(t *testing.T) {
// generate data // generate data
buf := make([]byte, rand.Intn(1<<19)) buf := make([]byte, rand.Intn(1<<19))
reader := bytes.NewReader(buf) _, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
n, err := c.PutObject(bucketName, objectName, reader, int64(reader.Len()), "") n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), int64(len(buf)), "")
if err != nil { if err != nil {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }
if n != int64(len(buf)) { if n != int64(len(buf)) {
t.Fatal("Error: bad length ", n, reader.Len()) t.Fatal("Error: bad length ", n, len(buf))
}
n, err = c.PutObject(bucketName, objectName+"-nolength", bytes.NewReader(buf), -1, "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName+"-nolength")
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
} }
newReader, _, err := c.GetObject(bucketName, objectName) newReader, _, err := c.GetObject(bucketName, objectName)
@ -333,6 +489,10 @@ func TestFunctional(t *testing.T) {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }
buf = make([]byte, rand.Intn(1<<20)) buf = make([]byte, rand.Intn(1<<20))
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error: ", err)
}
req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf)) req, err := http.NewRequest("PUT", presignedPutURL, bytes.NewReader(buf))
if err != nil { if err != nil {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
@ -365,25 +525,25 @@ func TestFunctional(t *testing.T) {
if err != nil { if err != nil {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }
err = c.RemoveObject(bucketName, objectName+"-nolength")
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveObject(bucketName, objectName+"-presigned") err = c.RemoveObject(bucketName, objectName+"-presigned")
if err != nil { if err != nil {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }
err = c.RemoveBucket(bucketName) err = c.RemoveBucket(bucketName)
if err != nil { if err != nil {
t.Fatal("Error:", err) t.Fatal("Error:", err)
} }
err = c.RemoveBucket("bucket1") err = c.RemoveBucket("bucket1")
if err == nil { if err == nil {
t.Fatal("Error:") t.Fatal("Error:")
} }
if err.Error() != "The specified bucket does not exist." { if err.Error() != "The specified bucket does not exist." {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }
if err = os.Remove(fileName); err != nil { if err = os.Remove(fileName); err != nil {
t.Fatal("Error: ", err) t.Fatal("Error: ", err)
} }

View file

@ -27,7 +27,7 @@ build_script:
- golint github.com/minio/minio-go... - golint github.com/minio/minio-go...
- deadcode - deadcode
- go test - go test
- go test -race - go test -test.short -race
# to disable automatic tests # to disable automatic tests
test: off test: off

View file

@ -19,7 +19,6 @@ package minio
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"sync" "sync"
) )
@ -42,21 +41,6 @@ func newTempFile(prefix string) (*tempFile, error) {
}, nil }, nil
} }
// cleanupStaleTempFiles - cleanup any stale files present in temp directory at a prefix.
func cleanupStaleTempfiles(prefix string) error {
globPath := filepath.Join(os.TempDir(), prefix) + "*"
staleFiles, err := filepath.Glob(globPath)
if err != nil {
return err
}
for _, staleFile := range staleFiles {
if err := os.Remove(staleFile); err != nil {
return err
}
}
return nil
}
// Close - closer wrapper to close and remove temporary file. // Close - closer wrapper to close and remove temporary file.
func (t *tempFile) Close() error { func (t *tempFile) Close() error {
t.mutex.Lock() t.mutex.Lock()