Update s3 library (again)

This commit is contained in:
Alexander Neumann 2016-01-08 21:01:06 +01:00
parent 6a56d5b87b
commit 1483e15e4e
8 changed files with 398 additions and 181 deletions

4
Godeps/Godeps.json generated
View file

@ -24,8 +24,8 @@
},
{
"ImportPath": "github.com/minio/minio-go",
"Comment": "v0.2.5-205-g38be406",
"Rev": "38be40605dc37d2d7ec06169218365b46ae33e4b"
"Comment": "v0.2.5-209-g77f35ea",
"Rev": "77f35ea56099f50b0425d0e2f3949773dae723c0"
},
{
"ImportPath": "github.com/pkg/sftp",

View file

@ -228,6 +228,9 @@ type Object struct {
currOffset int64
objectInfo ObjectInfo
// Keeps track of closed call.
isClosed bool
// Previous error saved for future calls.
prevErr error
}
@ -244,16 +247,16 @@ func (o *Object) Read(b []byte) (n int, err error) {
o.mutex.Lock()
defer o.mutex.Unlock()
// Previous prevErr is which was saved in previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// If current offset has reached Size limit, return EOF.
if o.currOffset >= o.objectInfo.Size {
return 0, io.EOF
}
// Previous prevErr is which was saved in previous operation.
if o.prevErr != nil {
return 0, o.prevErr
}
// Send current information over control channel to indicate we
// are ready.
reqMsg := readRequest{}
@ -297,7 +300,7 @@ func (o *Object) Stat() (ObjectInfo, error) {
o.mutex.Lock()
defer o.mutex.Unlock()
if o.prevErr != nil {
if o.prevErr != nil || o.isClosed {
return ObjectInfo{}, o.prevErr
}
@ -317,17 +320,17 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
o.mutex.Lock()
defer o.mutex.Unlock()
// prevErr is which was saved in previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// If offset is negative and offset is greater than or equal to
// object size we return EOF.
if offset < 0 || offset >= o.objectInfo.Size {
return 0, io.EOF
}
// prevErr is which was saved in previous operation.
if o.prevErr != nil {
return 0, o.prevErr
}
// Send current information over control channel to indicate we
// are ready.
reqMsg := readRequest{}
@ -386,11 +389,11 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
// Negative offset is valid for whence of '2'.
if offset < 0 && whence != 2 {
return 0, ErrInvalidArgument(fmt.Sprintf("Object: negative position not allowed for %d.", whence))
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
}
switch whence {
default:
return 0, ErrInvalidArgument(fmt.Sprintf("Object: invalid whence %d", whence))
return 0, ErrInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
case 0:
if offset > o.objectInfo.Size {
return 0, io.EOF
@ -410,7 +413,7 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
}
// Seeking to negative position not allowed for whence.
if o.objectInfo.Size+offset < 0 {
return 0, ErrInvalidArgument(fmt.Sprintf("Object: Seeking at negative offset not allowed for %d", whence))
return 0, ErrInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
}
o.currOffset += offset
}
@ -428,17 +431,19 @@ func (o *Object) Close() (err error) {
o.mutex.Lock()
defer o.mutex.Unlock()
// prevErr is which was saved in previous operation.
if o.prevErr != nil {
// if already closed return an error.
if o.isClosed {
return o.prevErr
}
// Close successfully.
close(o.doneCh)
// Save this for any subsequent frivolous reads.
errMsg := "Object: Is already closed. Bad file descriptor."
// Save for future operations.
errMsg := "Object is already closed. Bad file descriptor."
o.prevErr = errors.New(errMsg)
// Save here that we closed done channel successfully.
o.isClosed = true
return nil
}

View file

@ -0,0 +1,167 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"crypto/md5"
"crypto/sha256"
"hash"
"io"
"os"
)
// Verify if reader is *os.File
func isFile(reader io.Reader) (ok bool) {
_, ok = reader.(*os.File)
return
}
// Verify if reader is *minio.Object
func isObject(reader io.Reader) (ok bool) {
_, ok = reader.(*Object)
return
}
// Verify if reader is a generic ReaderAt
func isReadAt(reader io.Reader) (ok bool) {
_, ok = reader.(io.ReaderAt)
return
}
// hashCopyN - Calculates Md5sum and SHA256sum for upto partSize amount of bytes.
func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(writer, hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(writer, hashMD5, hashSHA256)
}
// Copies to input at writer.
size, err = io.CopyN(hashWriter, reader, partSize)
if err != nil {
// If not EOF return error right here.
if err != io.EOF {
return nil, nil, 0, err
}
}
// Seek back to beginning of input, any error fail right here.
if _, err := writer.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, err
}
// getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id.
func (c Client) getUploadID(bucketName, objectName, contentType string) (uploadID string, isNew bool, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", false, err
}
if err := isValidObjectName(objectName); err != nil {
return "", false, err
}
// Set content Type to default if empty string.
if contentType == "" {
contentType = "application/octet-stream"
}
// Find upload id for previous upload for an object.
uploadID, err = c.findUploadID(bucketName, objectName)
if err != nil {
return "", false, err
}
if uploadID == "" {
// Initiate multipart upload for an object.
initMultipartUploadResult, err := c.initiateMultipartUpload(bucketName, objectName, contentType)
if err != nil {
return "", false, err
}
// Save the new upload id.
uploadID = initMultipartUploadResult.UploadID
// Indicate that this is a new upload id.
isNew = true
}
return uploadID, isNew, nil
}
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(hashMD5, hashSHA256)
}
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
// Seek back reader to the beginning location.
if _, err := reader.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, nil
}
// Fetch all parts info, including total uploaded size, maximum part
// size and max part number.
func (c Client) getPartsInfo(bucketName, objectName, uploadID string) (prtsInfo map[int]objectPart, totalSize int64, maxPrtSize int64, maxPrtNumber int, err error) {
// Fetch previously upload parts.
prtsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return nil, 0, 0, 0, err
}
// Peek through all the parts and calculate totalSize, maximum
// part size and last part number.
for _, prtInfo := range prtsInfo {
// Save previously uploaded size.
totalSize += prtInfo.Size
// Choose the maximum part size.
if prtInfo.Size >= maxPrtSize {
maxPrtSize = prtInfo.Size
}
// Choose the maximum part number.
if maxPrtNumber < prtInfo.PartNumber {
maxPrtNumber = prtInfo.PartNumber
}
}
return prtsInfo, totalSize, maxPrtSize, maxPrtNumber, nil
}

View file

@ -17,80 +17,14 @@
package minio
import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
"sort"
)
// getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id.
func (c Client) getUploadID(bucketName, objectName, contentType string) (string, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", err
}
if err := isValidObjectName(objectName); err != nil {
return "", err
}
// Set content Type to default if empty string.
if contentType == "" {
contentType = "application/octet-stream"
}
// Find upload id for previous upload for an object.
uploadID, err := c.findUploadID(bucketName, objectName)
if err != nil {
return "", err
}
if uploadID == "" {
// Initiate multipart upload for an object.
initMultipartUploadResult, err := c.initiateMultipartUpload(bucketName, objectName, contentType)
if err != nil {
return "", err
}
// Save the new upload id.
uploadID = initMultipartUploadResult.UploadID
}
return uploadID, nil
}
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(hashMD5, hashSHA256)
}
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
// Seek back reader to the beginning location.
if _, err := reader.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, nil
}
// FPutObject - Create an object in a bucket, with contents from file at filePath.
func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) (n int64, err error) {
// Input validation.
@ -194,7 +128,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Get upload id for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object.
uploadID, err := c.getUploadID(bucketName, objectName, contentType)
uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
if err != nil {
return 0, err
}
@ -205,19 +139,19 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Fetch previously upload parts.
partsInfo, err := c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
// Previous maximum part size
var prevMaxPartSize int64
// Loop through all parts and fetch prevMaxPartSize.
for _, partInfo := range partsInfo {
// Choose the maximum part size.
if partInfo.Size >= prevMaxPartSize {
prevMaxPartSize = partInfo.Size
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
// If this session is a continuation of a previous session fetch all
// previously uploaded parts info.
if !isNew {
// Fetch previously upload parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}

View file

@ -18,11 +18,8 @@ package minio
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
"hash"
"io"
"io/ioutil"
"net/http"
@ -33,58 +30,6 @@ import (
"strings"
)
// Verify if reader is *os.File
func isFile(reader io.Reader) (ok bool) {
_, ok = reader.(*os.File)
return
}
// Verify if reader is *minio.Object
func isObject(reader io.Reader) (ok bool) {
_, ok = reader.(*Object)
return
}
// Verify if reader is a generic ReaderAt
func isReadAt(reader io.Reader) (ok bool) {
_, ok = reader.(io.ReaderAt)
return
}
// hashCopyN - Calculates Md5sum and SHA256sum for upto partSize amount of bytes.
func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(writer, hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(writer, hashMD5, hashSHA256)
}
// Copies to input at writer.
size, err = io.CopyN(hashWriter, reader, partSize)
if err != nil {
// If not EOF return error right here.
if err != io.EOF {
return nil, nil, 0, err
}
}
// Seek back to beginning of input, any error fail right here.
if _, err := writer.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, err
}
// Comprehensive put object operation involving multipart resumable uploads.
//
// Following code handles these types of readers.
@ -130,7 +75,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// getUploadID for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object.
uploadID, err := c.getUploadID(bucketName, objectName, contentType)
uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
if err != nil {
return 0, err
}
@ -141,18 +86,19 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Fetch previously upload parts.
partsInfo, err := c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
// Previous maximum part size
var prevMaxPartSize int64
// Loop through all parts and calculate totalUploadedSize.
for _, partInfo := range partsInfo {
// Choose the maximum part size.
if partInfo.Size >= prevMaxPartSize {
prevMaxPartSize = partInfo.Size
// A map of all previously uploaded parts.
var partsInfo = make(map[int]objectPart)
// If This session is a continuation of a previous session fetch all
// previously uploaded parts info.
if !isNew {
// Fetch previously uploaded parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
@ -204,6 +150,9 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// Close the temporary file.
tmpFile.Close()
// Save successfully uploaded size.
totalUploadedSize += size
// If read error was an EOF, break out of the loop.
if rErr == io.EOF {
break
@ -223,8 +172,6 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
// Save successfully uploaded size.
totalUploadedSize += part.Size
}
// Verify if partNumber is different than total list of parts.

View file

@ -46,7 +46,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Get upload id for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object.
uploadID, err := c.getUploadID(bucketName, objectName, contentType)
uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
if err != nil {
return 0, err
}
@ -57,25 +57,21 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Fetch previously upload parts.
partsInfo, err := c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
// Previous maximum part size
var prevMaxPartSize int64
// Previous part number.
var prevPartNumber int
// Loop through all parts and calculate totalUploadedSize.
for _, partInfo := range partsInfo {
totalUploadedSize += partInfo.Size
// Choose the maximum part size.
if partInfo.Size >= prevMaxPartSize {
prevMaxPartSize = partInfo.Size
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
// Fetch all parts info previously uploaded.
if !isNew {
partsInfo, totalUploadedSize, prevMaxPartSize, prevPartNumber, err = c.getPartsInfo(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
// Save previous part number.
prevPartNumber = partInfo.PartNumber
}
// Calculate the optimal part size for a given file size.

View file

@ -31,6 +31,90 @@ import (
"github.com/minio/minio-go"
)
func TestGetObjectClosedTwiceV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
if err := r.Close(); err != nil {
t.Fatal("Error:", err)
}
if err := r.Close(); err == nil {
t.Fatal("Error: object is already closed, should return error")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests removing partially uploaded objects.
func TestRemovePartiallyUploadedV2(t *testing.T) {
if testing.Short() {

View file

@ -55,6 +55,90 @@ func randString(n int, src rand.Source) string {
return string(b[0:30])
}
func TestGetObjectClosedTwice(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for short runs")
}
// Seed random based on current time.
rand.Seed(time.Now().Unix())
// Connect and make sure bucket exists.
c, err := minio.New(
"s3.amazonaws.com",
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
false,
)
if err != nil {
t.Fatal("Error:", err)
}
// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)
// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()))
// Make a new bucket.
err = c.MakeBucket(bucketName, "private", "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}
// Generate data more than 32K
buf := make([]byte, rand.Intn(1<<20)+32*1024)
_, err = io.ReadFull(crand.Reader, buf)
if err != nil {
t.Fatal("Error:", err)
}
// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()))
n, err := c.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}
// Read the data back
r, err := c.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
st, err := r.Stat()
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
if st.Size != int64(len(buf)) {
t.Fatalf("Error: number of bytes in stat does not match, want %v, got %v\n",
len(buf), st.Size)
}
if err := r.Close(); err != nil {
t.Fatal("Error:", err)
}
if err := r.Close(); err == nil {
t.Fatal("Error: object is already closed, should return error")
}
err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}
// Tests removing partially uploaded objects.
func TestRemovePartiallyUploaded(t *testing.T) {
if testing.Short() {