First pass at cleanup for PR merge

This commit is contained in:
Andrey Kostov 2014-12-23 00:24:45 +02:00
parent a32e6125e0
commit d296a3d2c0
2 changed files with 39 additions and 16 deletions

View file

@ -1,3 +1,17 @@
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
// This package leverages the crowdmob/goamz client library for interfacing with
// s3.
//
// Because s3 is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Keep in mind that s3 guarantees only eventual consistency, so do not assume
// that a successful write will mean immediate access to the data written (although
// in most regions a new object put has guaranteed read after write). The only true
// guarantee is that once you call Stat and receive a certain file size, that much of
// the file is already accessible.
package s3 package s3
import ( import (
@ -53,6 +67,9 @@ type Driver struct {
// - bucket // - bucket
// - encrypt // - encrypt
func FromParameters(parameters map[string]interface{}) (*Driver, error) { func FromParameters(parameters map[string]interface{}) (*Driver, error) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
accessKey, _ := parameters["accesskey"] accessKey, _ := parameters["accesskey"]
secretKey, _ := parameters["secretkey"] secretKey, _ := parameters["secretkey"]
@ -75,9 +92,9 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return nil, fmt.Errorf("No encrypt parameter provided") return nil, fmt.Errorf("No encrypt parameter provided")
} }
encryptBool, err := strconv.ParseBool(fmt.Sprint(encrypt)) encryptBool, ok := encrypt.(bool)
if err != nil { if !ok {
return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err) return nil, fmt.Errorf("The encrypt parameter should be a boolean")
} }
rootDirectory, ok := parameters["rootdirectory"] rootDirectory, ok := parameters["rootdirectory"]
@ -170,8 +187,13 @@ func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
return resp.Body, nil return resp.Body, nil
} }
// WriteStream stores the contents of the provided io.ReadCloser at a location // WriteStream stores the contents of the provided io.ReadCloser at a
// designated by the given path. // location designated by the given path. The driver will know it has
// received the full contents when the reader returns io.EOF. The number
// of successfully written bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) { func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) {
if !storagedriver.PathRegexp.MatchString(path) { if !storagedriver.PathRegexp.MatchString(path) {
return 0, storagedriver.InvalidPathError{Path: path} return 0, storagedriver.InvalidPathError{Path: path}
@ -563,10 +585,6 @@ func (d *Driver) s3Path(path string) string {
return strings.TrimLeft(d.rootDirectory+path, "/") return strings.TrimLeft(d.rootDirectory+path, "/")
} }
func (d *Driver) fullPath(path string) string {
return d.rootDirectory + path
}
func parseError(path string, err error) error { func parseError(path string, err error) error {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" { if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}

View file

@ -687,6 +687,8 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
c.Assert(fi.Size(), check.Equals, int64(0)) c.Assert(fi.Size(), check.Equals, int64(0))
c.Assert(fi.IsDir(), check.Equals, true) c.Assert(fi.IsDir(), check.Equals, true)
// Directories do not need to support ModTime, since key-value stores
// cannot support it efficiently.
// if start.After(fi.ModTime()) { // if start.After(fi.ModTime()) {
// c.Errorf("modtime %s before file created (%v)", fi.ModTime(), start) // c.Errorf("modtime %s before file created (%v)", fi.ModTime(), start)
// } // }
@ -785,6 +787,9 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) {
fi, err := suite.StorageDriver.Stat(filename) fi, err := suite.StorageDriver.Stat(filename)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
// We are most concerned with being able to read data as soon as Stat declares
// it is uploaded. This is the strongest guarantee that some drivers (that guarantee
// at best eventual consistency) absolutely need to provide.
if fi.Size() == offset+chunkSize { if fi.Size() == offset+chunkSize {
reader, err := suite.StorageDriver.ReadStream(filename, offset) reader, err := suite.StorageDriver.ReadStream(filename, offset)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
@ -801,6 +806,10 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) {
} }
} }
if misswrites > 0 {
c.Log("There were " + string(misswrites) + " occurences of a write not being instantly available.")
}
c.Assert(misswrites, check.Not(check.Equals), 1024) c.Assert(misswrites, check.Not(check.Equals), 1024)
} }
@ -957,13 +966,9 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
tf.Sync() tf.Sync()
tf.Seek(0, os.SEEK_SET) tf.Seek(0, os.SEEK_SET)
totalRead := int64(0)
for totalRead < size {
nn, err := suite.StorageDriver.WriteStream(filename, 0, tf) nn, err := suite.StorageDriver.WriteStream(filename, 0, tf)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
totalRead += nn c.Assert(nn, check.Equals, size)
}
c.Assert(totalRead, check.Equals, size)
reader, err := suite.StorageDriver.ReadStream(filename, 0) reader, err := suite.StorageDriver.ReadStream(filename, 0)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)