2014-12-22 22:24:45 +00:00
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
2016-01-28 23:48:49 +00:00
// This package leverages the docker/goamz client library for interfacing with
2016-01-22 02:17:53 +00:00
// S3. It is intended to be deprecated in favor of the s3-aws driver
// implementation.
2014-12-22 22:24:45 +00:00
//
2016-01-22 02:17:53 +00:00
// Because S3 is a key, value store the Stat call does not support last modification
2014-12-22 22:24:45 +00:00
// time for directories (directories are an abstraction for key, value stores)
//
2016-01-22 02:17:53 +00:00
// Keep in mind that S3 guarantees only read-after-write consistency for new
// objects, but no read-after-update or list-after-write consistency.
2014-10-24 23:37:25 +00:00
package s3
import (
"bytes"
2014-10-29 01:15:40 +00:00
"fmt"
2014-10-24 23:37:25 +00:00
"io"
2014-12-19 17:16:51 +00:00
"io/ioutil"
2014-10-24 23:37:25 +00:00
"net/http"
2015-04-22 21:31:34 +00:00
"reflect"
2014-10-24 23:37:25 +00:00
"strconv"
2014-12-19 17:16:51 +00:00
"strings"
2015-04-22 22:07:18 +00:00
"sync"
2014-12-19 17:16:51 +00:00
"time"
2014-10-24 23:37:25 +00:00
2015-04-24 03:07:32 +00:00
"github.com/Sirupsen/logrus"
2016-01-21 00:40:58 +00:00
"github.com/docker/goamz/aws"
"github.com/docker/goamz/s3"
2015-04-27 22:58:58 +00:00
"github.com/docker/distribution/context"
2016-01-21 00:40:58 +00:00
"github.com/docker/distribution/registry/client/transport"
2015-02-11 02:14:23 +00:00
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
2014-10-24 23:37:25 +00:00
)
2016-01-22 02:17:53 +00:00
const driverName = "s3goamz"
2014-10-29 01:15:40 +00:00
2014-10-29 19:14:19 +00:00
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
2015-01-24 00:46:43 +00:00
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
2014-10-24 23:37:25 +00:00
2014-12-19 17:16:51 +00:00
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
2014-10-24 23:37:25 +00:00
2015-01-07 10:18:42 +00:00
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Region aws . Region
Encrypt bool
Secure bool
V4Auth bool
2015-01-24 00:46:43 +00:00
ChunkSize int64
2015-01-07 10:18:42 +00:00
RootDirectory string
2016-01-28 23:48:49 +00:00
StorageClass s3 . StorageClass
2016-01-21 00:40:58 +00:00
UserAgent string
2015-01-07 10:18:42 +00:00
}
2014-10-29 01:15:40 +00:00
func init ( ) {
2014-11-17 23:44:07 +00:00
factory . Register ( driverName , & s3DriverFactory { } )
2014-10-29 01:15:40 +00:00
}
2014-10-29 19:14:19 +00:00
// s3DriverFactory implements the factory.StorageDriverFactory interface
2014-10-29 01:15:40 +00:00
type s3DriverFactory struct { }
2014-12-19 17:16:51 +00:00
func ( factory * s3DriverFactory ) Create ( parameters map [ string ] interface { } ) ( storagedriver . StorageDriver , error ) {
2014-10-29 01:15:40 +00:00
return FromParameters ( parameters )
}
2015-02-04 00:54:52 +00:00
type driver struct {
2014-12-19 17:16:51 +00:00
S3 * s3 . S3
Bucket * s3 . Bucket
2015-01-24 00:46:43 +00:00
ChunkSize int64
2014-12-19 17:16:51 +00:00
Encrypt bool
2015-01-24 00:46:43 +00:00
RootDirectory string
2016-01-28 23:48:49 +00:00
StorageClass s3 . StorageClass
2015-04-22 22:07:18 +00:00
pool sync . Pool // pool []byte buffers used for WriteStream
zeros [ ] byte // shared, zero-valued buffer used for WriteStream
2014-10-24 23:37:25 +00:00
}
2015-02-04 00:54:52 +00:00
type baseEmbed struct {
base . Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
2014-11-17 23:44:07 +00:00
// FromParameters constructs a new Driver with a given parameters map
2014-10-29 01:15:40 +00:00
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
2014-12-19 17:16:51 +00:00
func FromParameters ( parameters map [ string ] interface { } ) ( * Driver , error ) {
2014-12-22 22:24:45 +00:00
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
2016-02-03 03:30:48 +00:00
accessKey := parameters [ "accesskey" ]
if accessKey == nil {
2015-01-23 23:50:55 +00:00
accessKey = ""
}
2016-02-03 03:30:48 +00:00
secretKey := parameters [ "secretkey" ]
if secretKey == nil {
2015-01-23 23:50:55 +00:00
secretKey = ""
}
2014-10-29 01:15:40 +00:00
2016-02-03 03:30:48 +00:00
regionName := parameters [ "region" ]
if regionName == nil || fmt . Sprint ( regionName ) == "" {
2014-10-29 01:15:40 +00:00
return nil , fmt . Errorf ( "No region parameter provided" )
}
2014-12-19 17:16:51 +00:00
region := aws . GetRegion ( fmt . Sprint ( regionName ) )
2014-10-29 01:15:40 +00:00
if region . Name == "" {
2014-11-13 01:19:19 +00:00
return nil , fmt . Errorf ( "Invalid region provided: %v" , region )
2014-10-29 01:15:40 +00:00
}
2016-02-03 03:30:48 +00:00
bucket := parameters [ "bucket" ]
if bucket == nil || fmt . Sprint ( bucket ) == "" {
2014-10-29 01:15:40 +00:00
return nil , fmt . Errorf ( "No bucket parameter provided" )
}
2015-01-07 09:51:29 +00:00
encryptBool := false
2016-02-03 03:30:48 +00:00
encrypt := parameters [ "encrypt" ]
switch encrypt := encrypt . ( type ) {
case string :
b , err := strconv . ParseBool ( encrypt )
if err != nil {
2014-12-30 17:31:12 +00:00
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
encryptBool = b
case bool :
encryptBool = encrypt
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
2014-10-29 01:15:40 +00:00
}
2014-12-19 17:16:51 +00:00
2015-01-07 09:45:31 +00:00
secureBool := true
2016-02-03 03:30:48 +00:00
secure := parameters [ "secure" ]
switch secure := secure . ( type ) {
case string :
b , err := strconv . ParseBool ( secure )
if err != nil {
2014-12-29 20:29:54 +00:00
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
secureBool = b
case bool :
secureBool = secure
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
2014-12-29 20:29:54 +00:00
}
2015-02-06 01:41:04 +00:00
v4AuthBool := false
2016-02-03 03:30:48 +00:00
v4Auth := parameters [ "v4auth" ]
switch v4Auth := v4Auth . ( type ) {
case string :
b , err := strconv . ParseBool ( v4Auth )
if err != nil {
2015-01-07 09:45:31 +00:00
return nil , fmt . Errorf ( "The v4auth parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
v4AuthBool = b
case bool :
v4AuthBool = v4Auth
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The v4auth parameter should be a boolean" )
2015-01-07 09:45:31 +00:00
}
2015-01-24 00:46:43 +00:00
chunkSize := int64 ( defaultChunkSize )
2016-02-03 03:30:48 +00:00
chunkSizeParam := parameters [ "chunksize" ]
switch v := chunkSizeParam . ( type ) {
case string :
vv , err := strconv . ParseInt ( v , 0 , 64 )
if err != nil {
return nil , fmt . Errorf ( "chunksize parameter must be an integer, %v invalid" , chunkSizeParam )
2015-04-22 21:31:34 +00:00
}
2016-02-03 03:30:48 +00:00
chunkSize = vv
case int64 :
chunkSize = v
case int , uint , int32 , uint32 , uint64 :
chunkSize = reflect . ValueOf ( v ) . Convert ( reflect . TypeOf ( chunkSize ) ) . Int ( )
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "invalid value for chunksize: %#v" , chunkSizeParam )
}
2015-04-22 21:31:34 +00:00
2016-02-03 03:30:48 +00:00
if chunkSize < minChunkSize {
return nil , fmt . Errorf ( "The chunksize %#v parameter should be a number that is larger than or equal to %d" , chunkSize , minChunkSize )
2015-01-24 00:46:43 +00:00
}
2016-02-03 03:30:48 +00:00
rootDirectory := parameters [ "rootdirectory" ]
if rootDirectory == nil {
2014-12-30 17:31:12 +00:00
rootDirectory = ""
2014-12-19 17:16:51 +00:00
}
2016-01-28 23:48:49 +00:00
storageClass := s3 . StandardStorage
2016-02-03 03:30:48 +00:00
storageClassParam := parameters [ "storageclass" ]
if storageClassParam != nil {
2016-01-28 23:48:49 +00:00
storageClassString , ok := storageClassParam . ( string )
if ! ok {
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] s3 . StorageClass { s3 . StandardStorage , s3 . ReducedRedundancy } , storageClassParam )
}
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
storageClassCasted := s3 . StorageClass ( strings . ToUpper ( storageClassString ) )
if storageClassCasted != s3 . StandardStorage && storageClassCasted != s3 . ReducedRedundancy {
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] s3 . StorageClass { s3 . StandardStorage , s3 . ReducedRedundancy } , storageClassParam )
}
storageClass = storageClassCasted
}
2016-02-03 03:30:48 +00:00
userAgent := parameters [ "useragent" ]
if userAgent == nil {
2016-01-21 00:40:58 +00:00
userAgent = ""
}
2015-01-07 10:18:42 +00:00
params := DriverParameters {
fmt . Sprint ( accessKey ) ,
fmt . Sprint ( secretKey ) ,
fmt . Sprint ( bucket ) ,
region ,
encryptBool ,
secureBool ,
v4AuthBool ,
2015-01-24 00:46:43 +00:00
chunkSize ,
2015-01-07 10:18:42 +00:00
fmt . Sprint ( rootDirectory ) ,
2016-01-28 23:48:49 +00:00
storageClass ,
2016-01-21 00:40:58 +00:00
fmt . Sprint ( userAgent ) ,
2015-01-07 10:18:42 +00:00
}
return New ( params )
2014-10-29 01:15:40 +00:00
}
2014-11-17 23:44:07 +00:00
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
2014-10-29 19:14:19 +00:00
// bucketName
2015-01-07 10:18:42 +00:00
func New ( params DriverParameters ) ( * Driver , error ) {
auth , err := aws . GetAuth ( params . AccessKey , params . SecretKey , "" , time . Time { } )
2014-12-19 17:16:51 +00:00
if err != nil {
2015-05-22 23:45:45 +00:00
return nil , fmt . Errorf ( "unable to resolve aws credentials, please ensure that 'accesskey' and 'secretkey' are properly set or the credentials are available in $HOME/.aws/credentials: %v" , err )
2014-12-19 17:16:51 +00:00
}
2015-01-07 10:18:42 +00:00
if ! params . Secure {
params . Region . S3Endpoint = strings . Replace ( params . Region . S3Endpoint , "https" , "http" , 1 )
2014-12-29 20:29:54 +00:00
}
2015-01-07 10:18:42 +00:00
s3obj := s3 . New ( auth , params . Region )
2016-01-21 00:40:58 +00:00
if params . UserAgent != "" {
s3obj . Client = & http . Client {
Transport : transport . NewTransport ( http . DefaultTransport ,
transport . NewHeaderRequestModifier ( http . Header {
http . CanonicalHeaderKey ( "User-Agent" ) : [ ] string { params . UserAgent } ,
} ) ,
) ,
}
}
2014-10-24 23:37:25 +00:00
2015-01-07 10:18:42 +00:00
if params . V4Auth {
2015-01-07 09:45:31 +00:00
s3obj . Signature = aws . V4Signature
} else {
2015-01-07 10:18:42 +00:00
if params . Region . Name == "eu-central-1" {
2015-01-07 09:45:31 +00:00
return nil , fmt . Errorf ( "The eu-central-1 region only works with v4 authentication" )
}
}
2014-10-24 23:37:25 +00:00
2016-01-21 00:40:58 +00:00
bucket := s3obj . Bucket ( params . Bucket )
2014-12-21 06:48:42 +00:00
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
2014-12-19 17:16:51 +00:00
2015-02-04 00:54:52 +00:00
d := & driver {
2015-01-24 00:46:43 +00:00
S3 : s3obj ,
Bucket : bucket ,
ChunkSize : params . ChunkSize ,
Encrypt : params . Encrypt ,
2015-02-04 00:54:52 +00:00
RootDirectory : params . RootDirectory ,
2016-01-28 23:48:49 +00:00
StorageClass : params . StorageClass ,
2015-04-22 22:07:18 +00:00
zeros : make ( [ ] byte , params . ChunkSize ) ,
}
d . pool . New = func ( ) interface { } {
return make ( [ ] byte , d . ChunkSize )
2015-02-04 00:54:52 +00:00
}
return & Driver {
baseEmbed : baseEmbed {
Base : base . Base {
StorageDriver : d ,
} ,
} ,
} , nil
2014-10-24 23:37:25 +00:00
}
2014-10-29 19:14:19 +00:00
// Implement the storagedriver.StorageDriver interface
2015-04-23 00:30:01 +00:00
func ( d * driver ) Name ( ) string {
return driverName
}
2014-11-17 23:44:07 +00:00
// GetContent retrieves the content stored at "path" as a []byte.
2015-04-27 22:58:58 +00:00
func ( d * driver ) GetContent ( ctx context . Context , path string ) ( [ ] byte , error ) {
2014-12-19 17:16:51 +00:00
content , err := d . Bucket . Get ( d . s3Path ( path ) )
2014-11-19 01:41:48 +00:00
if err != nil {
2014-12-19 17:16:51 +00:00
return nil , parseError ( path , err )
2014-11-19 01:41:48 +00:00
}
return content , nil
2014-10-24 23:37:25 +00:00
}
2014-11-17 23:44:07 +00:00
// PutContent stores the []byte content at a location designated by "path".
2015-04-27 22:58:58 +00:00
func ( d * driver ) PutContent ( ctx context . Context , path string , contents [ ] byte ) error {
2014-12-19 17:16:51 +00:00
return parseError ( path , d . Bucket . Put ( d . s3Path ( path ) , contents , d . getContentType ( ) , getPermissions ( ) , d . getOptions ( ) ) )
2014-10-24 23:37:25 +00:00
}
2014-11-17 23:44:07 +00:00
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
2015-04-27 22:58:58 +00:00
func ( d * driver ) ReadStream ( ctx context . Context , path string , offset int64 ) ( io . ReadCloser , error ) {
2014-10-24 23:37:25 +00:00
headers := make ( http . Header )
2014-12-03 03:01:00 +00:00
headers . Add ( "Range" , "bytes=" + strconv . FormatInt ( offset , 10 ) + "-" )
2014-10-24 23:37:25 +00:00
2014-12-19 17:16:51 +00:00
resp , err := d . Bucket . GetResponseWithHeaders ( d . s3Path ( path ) , headers )
2014-11-19 01:41:48 +00:00
if err != nil {
2014-12-19 17:16:51 +00:00
if s3Err , ok := err . ( * s3 . Error ) ; ok && s3Err . Code == "InvalidRange" {
return ioutil . NopCloser ( bytes . NewReader ( nil ) ) , nil
}
return nil , parseError ( path , err )
2014-10-24 23:37:25 +00:00
}
2014-11-19 01:41:48 +00:00
return resp . Body , nil
2014-10-24 23:37:25 +00:00
}
2014-12-23 08:54:01 +00:00
// WriteStream stores the contents of the provided io.Reader at a
2014-12-22 22:24:45 +00:00
// location designated by the given path. The driver will know it has
// received the full contents when the reader returns io.EOF. The number
2014-12-23 08:54:01 +00:00
// of successfully READ bytes will be returned, even if an error is
2014-12-22 22:24:45 +00:00
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
2015-04-27 22:58:58 +00:00
func ( d * driver ) WriteStream ( ctx context . Context , path string , offset int64 , reader io . Reader ) ( totalRead int64 , err error ) {
2014-10-24 23:37:25 +00:00
partNumber := 1
2014-12-19 17:20:07 +00:00
bytesRead := 0
2015-01-27 01:51:59 +00:00
var putErrChan chan error
2014-12-19 17:16:51 +00:00
parts := [ ] s3 . Part { }
var part s3 . Part
2015-04-28 21:06:24 +00:00
done := make ( chan struct { } ) // stopgap to free up waiting goroutines
2014-12-19 17:16:51 +00:00
multi , err := d . Bucket . InitMulti ( d . s3Path ( path ) , d . getContentType ( ) , getPermissions ( ) , d . getOptions ( ) )
2014-10-24 23:37:25 +00:00
if err != nil {
2014-12-19 17:16:51 +00:00
return 0 , err
2014-10-24 23:37:25 +00:00
}
2015-04-22 22:07:18 +00:00
buf := d . getbuf ( )
2014-12-19 17:16:51 +00:00
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
// the stat call.
//
// Note that if the machine dies before executing the defer, we will be left with a dangling
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func ( ) {
2015-01-27 01:51:59 +00:00
if putErrChan != nil {
if putErr := <- putErrChan ; putErr != nil {
err = putErr
}
}
2014-12-19 17:16:51 +00:00
if len ( parts ) > 0 {
2014-12-19 17:20:07 +00:00
if multi == nil {
// Parts should be empty if the multi is not initialized
panic ( "Unreachable" )
} else {
if multi . Complete ( parts ) != nil {
multi . Abort ( )
}
2014-12-19 17:16:51 +00:00
}
}
2015-04-22 22:07:18 +00:00
d . putbuf ( buf ) // needs to be here to pick up new buf value
2015-04-28 21:06:24 +00:00
close ( done ) // free up any waiting goroutines
2014-12-19 17:16:51 +00:00
} ( )
2014-10-24 23:37:25 +00:00
2014-12-19 17:20:07 +00:00
// Fills from 0 to total from current
fromSmallCurrent := func ( total int64 ) error {
2015-04-27 22:58:58 +00:00
current , err := d . ReadStream ( ctx , path , 0 )
2014-12-19 17:16:51 +00:00
if err != nil {
2014-12-19 17:20:07 +00:00
return err
2014-12-19 17:16:51 +00:00
}
2014-12-19 17:20:07 +00:00
bytesRead = 0
for int64 ( bytesRead ) < total {
//The loop should very rarely enter a second iteration
2014-12-20 08:32:48 +00:00
nn , err := current . Read ( buf [ bytesRead : total ] )
2014-12-19 22:18:27 +00:00
bytesRead += nn
if err != nil {
2014-12-20 08:32:48 +00:00
if err != io . EOF {
return err
}
break
2014-12-19 17:16:51 +00:00
}
2014-12-19 17:20:07 +00:00
}
return nil
}
// Fills from parameter to chunkSize from reader
fromReader := func ( from int64 ) error {
bytesRead = 0
2015-01-24 00:46:43 +00:00
for from + int64 ( bytesRead ) < d . ChunkSize {
2014-12-20 08:32:48 +00:00
nn , err := reader . Read ( buf [ from + int64 ( bytesRead ) : ] )
2014-12-19 17:20:07 +00:00
totalRead += int64 ( nn )
bytesRead += nn
2014-12-19 22:18:27 +00:00
if err != nil {
2014-12-20 08:32:48 +00:00
if err != io . EOF {
2014-12-19 22:18:27 +00:00
return err
}
break
2014-12-19 17:20:07 +00:00
}
}
2015-01-27 01:51:59 +00:00
if putErrChan == nil {
putErrChan = make ( chan error )
} else {
if putErr := <- putErrChan ; putErr != nil {
putErrChan = nil
return putErr
2014-12-20 08:32:48 +00:00
}
2014-12-19 17:20:07 +00:00
}
2015-01-27 01:51:59 +00:00
go func ( bytesRead int , from int64 , buf [ ] byte ) {
2015-04-22 22:07:18 +00:00
defer d . putbuf ( buf ) // this buffer gets dropped after this call
2015-04-24 03:07:32 +00:00
// DRAGONS(stevvooe): There are few things one might want to know
// about this section. First, the putErrChan is expecting an error
// and a nil or just a nil to come through the channel. This is
// covered by the silly defer below. The other aspect is the s3
// retry backoff to deal with RequestTimeout errors. Even though
// the underlying s3 library should handle it, it doesn't seem to
// be part of the shouldRetry function (see AdRoll/goamz/s3).
defer func ( ) {
2015-04-28 21:06:24 +00:00
select {
case putErrChan <- nil : // for some reason, we do this no matter what.
case <- done :
return // ensure we don't leak the goroutine
}
2015-04-24 03:07:32 +00:00
} ( )
if bytesRead <= 0 {
return
}
var err error
var part s3 . Part
loop :
for retries := 0 ; retries < 5 ; retries ++ {
part , err = multi . PutPart ( int ( partNumber ) , bytes . NewReader ( buf [ 0 : int64 ( bytesRead ) + from ] ) )
if err == nil {
break // success!
}
// NOTE(stevvooe): This retry code tries to only retry under
// conditions where the s3 package does not. We may add s3
// error codes to the below if we see others bubble up in the
// application. Right now, the most troubling is
// RequestTimeout, which seems to only triggered when a tcp
// connection to s3 slows to a crawl. If the RequestTimeout
// ends up getting added to the s3 library and we don't see
// other errors, this retry loop can be removed.
switch err := err . ( type ) {
case * s3 . Error :
switch err . Code {
case "RequestTimeout" :
// allow retries on only this error.
default :
break loop
}
2015-01-27 01:51:59 +00:00
}
2015-04-24 03:07:32 +00:00
backoff := 100 * time . Millisecond * time . Duration ( retries + 1 )
logrus . Errorf ( "error putting part, retrying after %v: %v" , err , backoff . String ( ) )
time . Sleep ( backoff )
2015-01-27 01:51:59 +00:00
}
2015-04-24 03:07:32 +00:00
if err != nil {
logrus . Errorf ( "error putting part, aborting: %v" , err )
2015-04-28 21:06:24 +00:00
select {
case putErrChan <- err :
case <- done :
return // don't leak the goroutine
}
2015-04-24 03:07:32 +00:00
}
// parts and partNumber are safe, because this function is the
// only one modifying them and we force it to be executed
// serially.
parts = append ( parts , part )
partNumber ++
2015-01-27 01:51:59 +00:00
} ( bytesRead , from , buf )
2015-04-22 22:07:18 +00:00
buf = d . getbuf ( ) // use a new buffer for the next call
2014-12-19 17:20:07 +00:00
return nil
}
if offset > 0 {
resp , err := d . Bucket . Head ( d . s3Path ( path ) , nil )
if err != nil {
if s3Err , ok := err . ( * s3 . Error ) ; ! ok || s3Err . Code != "NoSuchKey" {
2014-12-19 17:16:51 +00:00
return 0 , err
}
2014-12-19 17:20:07 +00:00
}
2014-12-19 17:16:51 +00:00
2014-12-19 17:20:07 +00:00
currentLength := int64 ( 0 )
if err == nil {
currentLength = resp . ContentLength
}
2014-12-19 17:16:51 +00:00
2014-12-19 17:20:07 +00:00
if currentLength >= offset {
2015-01-24 00:46:43 +00:00
if offset < d . ChunkSize {
2014-12-19 17:20:07 +00:00
// chunkSize > currentLength >= offset
if err = fromSmallCurrent ( offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( offset ) ; err != nil {
return totalRead , err
}
2014-12-20 08:32:48 +00:00
2015-01-24 00:46:43 +00:00
if totalRead + offset < d . ChunkSize {
2014-12-20 08:32:48 +00:00
return totalRead , nil
}
2014-12-19 17:20:07 +00:00
} else {
// currentLength >= offset >= chunkSize
_ , part , err = multi . PutPartCopy ( partNumber ,
s3 . CopyOptions { CopySourceOptions : "bytes=0-" + strconv . FormatInt ( offset - 1 , 10 ) } ,
d . Bucket . Name + "/" + d . s3Path ( path ) )
if err != nil {
return 0 , err
}
2014-12-19 17:16:51 +00:00
2014-12-20 08:32:48 +00:00
parts = append ( parts , part )
partNumber ++
2014-12-19 17:16:51 +00:00
}
} else {
2014-12-19 17:20:07 +00:00
// Fills between parameters with 0s but only when to - from <= chunkSize
fromZeroFillSmall := func ( from , to int64 ) error {
bytesRead = 0
for from + int64 ( bytesRead ) < to {
2015-04-22 22:07:18 +00:00
nn , err := bytes . NewReader ( d . zeros ) . Read ( buf [ from + int64 ( bytesRead ) : to ] )
2014-12-19 17:20:07 +00:00
bytesRead += nn
2014-12-19 22:18:27 +00:00
if err != nil {
2014-12-19 17:20:07 +00:00
return err
}
}
return nil
2014-12-19 17:16:51 +00:00
}
2014-12-19 17:20:07 +00:00
// Fills between parameters with 0s, making new parts
fromZeroFillLarge := func ( from , to int64 ) error {
bytesRead64 := int64 ( 0 )
2015-01-24 00:46:43 +00:00
for to - ( from + bytesRead64 ) >= d . ChunkSize {
2015-04-22 22:07:18 +00:00
part , err := multi . PutPart ( int ( partNumber ) , bytes . NewReader ( d . zeros ) )
2014-12-19 17:20:07 +00:00
if err != nil {
return err
}
2015-01-24 00:46:43 +00:00
bytesRead64 += d . ChunkSize
2014-12-19 17:20:07 +00:00
parts = append ( parts , part )
partNumber ++
}
2015-01-24 00:46:43 +00:00
return fromZeroFillSmall ( 0 , ( to - from ) % d . ChunkSize )
2014-12-19 17:20:07 +00:00
}
// currentLength < offset
2015-01-24 00:46:43 +00:00
if currentLength < d . ChunkSize {
if offset < d . ChunkSize {
2014-12-19 17:20:07 +00:00
// chunkSize > offset > currentLength
if err = fromSmallCurrent ( currentLength ) ; err != nil {
return totalRead , err
}
if err = fromZeroFillSmall ( currentLength , offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( offset ) ; err != nil {
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if totalRead + offset < d . ChunkSize {
2014-12-19 17:20:07 +00:00
return totalRead , nil
}
} else {
// offset >= chunkSize > currentLength
if err = fromSmallCurrent ( currentLength ) ; err != nil {
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if err = fromZeroFillSmall ( currentLength , d . ChunkSize ) ; err != nil {
2014-12-19 17:20:07 +00:00
return totalRead , err
}
part , err = multi . PutPart ( int ( partNumber ) , bytes . NewReader ( buf ) )
if err != nil {
return totalRead , err
}
parts = append ( parts , part )
partNumber ++
//Zero fill from chunkSize up to offset, then some reader
2015-01-24 00:46:43 +00:00
if err = fromZeroFillLarge ( d . ChunkSize , offset ) ; err != nil {
2014-12-19 17:20:07 +00:00
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if err = fromReader ( offset % d . ChunkSize ) ; err != nil {
2014-12-19 17:20:07 +00:00
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if totalRead + ( offset % d . ChunkSize ) < d . ChunkSize {
2014-12-19 17:20:07 +00:00
return totalRead , nil
}
}
} else {
// offset > currentLength >= chunkSize
_ , part , err = multi . PutPartCopy ( partNumber ,
2015-01-07 09:45:31 +00:00
s3 . CopyOptions { } ,
2014-12-19 17:20:07 +00:00
d . Bucket . Name + "/" + d . s3Path ( path ) )
if err != nil {
return 0 , err
}
parts = append ( parts , part )
partNumber ++
//Zero fill from currentLength up to offset, then some reader
if err = fromZeroFillLarge ( currentLength , offset ) ; err != nil {
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if err = fromReader ( ( offset - currentLength ) % d . ChunkSize ) ; err != nil {
2014-12-19 17:20:07 +00:00
return totalRead , err
}
2015-01-24 00:46:43 +00:00
if totalRead + ( ( offset - currentLength ) % d . ChunkSize ) < d . ChunkSize {
2014-12-19 17:20:07 +00:00
return totalRead , nil
}
}
2014-12-19 17:16:51 +00:00
}
2014-10-24 23:37:25 +00:00
}
for {
2014-12-20 08:32:48 +00:00
if err = fromReader ( 0 ) ; err != nil {
2014-12-19 17:16:51 +00:00
return totalRead , err
}
2014-10-24 23:37:25 +00:00
2015-01-24 00:46:43 +00:00
if int64 ( bytesRead ) < d . ChunkSize {
2014-12-19 17:16:51 +00:00
break
2014-10-24 23:37:25 +00:00
}
}
2014-12-19 17:16:51 +00:00
return totalRead , nil
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Stat ( ctx context . Context , path string ) ( storagedriver . FileInfo , error ) {
2014-12-19 17:16:51 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "" , "" , 1 )
2014-10-24 23:37:25 +00:00
if err != nil {
2014-12-19 17:16:51 +00:00
return nil , err
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
fi := storagedriver . FileInfoFields {
Path : path ,
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
if len ( listResponse . Contents ) == 1 {
if listResponse . Contents [ 0 ] . Key != d . s3Path ( path ) {
fi . IsDir = true
} else {
fi . IsDir = false
fi . Size = listResponse . Contents [ 0 ] . Size
timestamp , err := time . Parse ( time . RFC3339Nano , listResponse . Contents [ 0 ] . LastModified )
if err != nil {
return nil , err
}
fi . ModTime = timestamp
}
} else if len ( listResponse . CommonPrefixes ) == 1 {
fi . IsDir = true
} else {
return nil , storagedriver . PathNotFoundError { Path : path }
}
return storagedriver . FileInfoInternal { FileInfoFields : fi } , nil
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
// List returns a list of the objects that are direct descendants of the given path.
2015-12-08 19:02:40 +00:00
func ( d * driver ) List ( ctx context . Context , opath string ) ( [ ] string , error ) {
path := opath
2014-12-19 17:16:51 +00:00
if path != "/" && path [ len ( path ) - 1 ] != '/' {
2014-11-07 20:58:48 +00:00
path = path + "/"
2014-11-04 00:20:38 +00:00
}
2015-02-20 00:31:34 +00:00
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d . s3Path ( "" ) == "" {
prefix = "/"
}
2014-12-19 17:16:51 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "/" , "" , listMax )
2014-10-24 23:37:25 +00:00
if err != nil {
2015-12-08 19:02:40 +00:00
return nil , parseError ( opath , err )
2015-11-24 22:23:12 +00:00
}
2014-10-24 23:37:25 +00:00
files := [ ] string { }
directories := [ ] string { }
2014-11-04 00:20:38 +00:00
for {
2014-10-24 23:37:25 +00:00
for _ , key := range listResponse . Contents {
2015-02-20 00:31:34 +00:00
files = append ( files , strings . Replace ( key . Key , d . s3Path ( "" ) , prefix , 1 ) )
2014-10-24 23:37:25 +00:00
}
for _ , commonPrefix := range listResponse . CommonPrefixes {
2015-02-20 00:31:34 +00:00
directories = append ( directories , strings . Replace ( commonPrefix [ 0 : len ( commonPrefix ) - 1 ] , d . s3Path ( "" ) , prefix , 1 ) )
2014-10-24 23:37:25 +00:00
}
2014-11-04 00:20:38 +00:00
if listResponse . IsTruncated {
2014-12-19 17:16:51 +00:00
listResponse , err = d . Bucket . List ( d . s3Path ( path ) , "/" , listResponse . NextMarker , listMax )
2014-11-04 00:20:38 +00:00
if err != nil {
return nil , err
}
2014-10-24 23:37:25 +00:00
} else {
2014-11-04 00:20:38 +00:00
break
2014-10-24 23:37:25 +00:00
}
}
2015-12-08 19:02:40 +00:00
if opath != "/" {
if len ( files ) == 0 && len ( directories ) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in s3.
return nil , storagedriver . PathNotFoundError { Path : opath }
}
}
2014-10-24 23:37:25 +00:00
return append ( files , directories ... ) , nil
}
2014-11-17 23:44:07 +00:00
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Move ( ctx context . Context , sourcePath string , destPath string ) error {
2014-10-24 23:37:25 +00:00
/* This is terrible, but aws doesn't have an actual move. */
2014-12-19 17:16:51 +00:00
_ , err := d . Bucket . PutCopy ( d . s3Path ( destPath ) , getPermissions ( ) ,
s3 . CopyOptions { Options : d . getOptions ( ) , ContentType : d . getContentType ( ) } , d . Bucket . Name + "/" + d . s3Path ( sourcePath ) )
2014-10-24 23:37:25 +00:00
if err != nil {
2014-12-19 17:16:51 +00:00
return parseError ( sourcePath , err )
2014-10-24 23:37:25 +00:00
}
2015-04-27 22:58:58 +00:00
return d . Delete ( ctx , sourcePath )
2014-10-24 23:37:25 +00:00
}
2014-11-17 23:44:07 +00:00
// Delete recursively deletes all objects stored at "path" and its subpaths.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Delete ( ctx context . Context , path string ) error {
2014-12-19 17:16:51 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "" , "" , listMax )
2014-10-24 23:37:25 +00:00
if err != nil || len ( listResponse . Contents ) == 0 {
2014-11-13 01:19:19 +00:00
return storagedriver . PathNotFoundError { Path : path }
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
s3Objects := make ( [ ] s3 . Object , listMax )
2014-10-24 23:37:25 +00:00
for len ( listResponse . Contents ) > 0 {
for index , key := range listResponse . Contents {
s3Objects [ index ] . Key = key . Key
}
2014-11-13 01:19:19 +00:00
err := d . Bucket . DelMulti ( s3 . Delete { Quiet : false , Objects : s3Objects [ 0 : len ( listResponse . Contents ) ] } )
2014-10-24 23:37:25 +00:00
if err != nil {
return nil
}
2014-12-19 17:16:51 +00:00
listResponse , err = d . Bucket . List ( d . s3Path ( path ) , "" , "" , listMax )
2014-10-24 23:37:25 +00:00
if err != nil {
return err
}
}
return nil
}
2015-01-07 16:31:38 +00:00
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
2015-04-27 22:58:58 +00:00
func ( d * driver ) URLFor ( ctx context . Context , path string , options map [ string ] interface { } ) ( string , error ) {
2015-01-14 19:31:11 +00:00
methodString := "GET"
method , ok := options [ "method" ]
if ok {
methodString , ok = method . ( string )
if ! ok || ( methodString != "GET" && methodString != "HEAD" ) {
2015-11-02 21:23:53 +00:00
return "" , storagedriver . ErrUnsupportedMethod { }
2015-01-14 19:31:11 +00:00
}
}
2015-01-09 01:10:32 +00:00
expiresTime := time . Now ( ) . Add ( 20 * time . Minute )
2015-01-09 01:45:21 +00:00
expires , ok := options [ "expiry" ]
2015-01-09 01:10:32 +00:00
if ok {
et , ok := expires . ( time . Time )
if ok {
expiresTime = et
}
}
2015-01-14 19:31:11 +00:00
return d . Bucket . SignedURLWithMethod ( methodString , d . s3Path ( path ) , expiresTime , nil , nil ) , nil
2015-01-07 16:31:38 +00:00
}
2015-02-04 00:54:52 +00:00
func ( d * driver ) s3Path ( path string ) string {
2015-01-24 00:46:43 +00:00
return strings . TrimLeft ( strings . TrimRight ( d . RootDirectory , "/" ) + path , "/" )
2014-12-19 17:16:51 +00:00
}
2014-10-24 23:37:25 +00:00
2015-04-06 23:23:31 +00:00
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func ( d * Driver ) S3BucketKey ( path string ) string {
return d . StorageDriver . ( * driver ) . s3Path ( path )
}
2014-12-19 17:16:51 +00:00
func parseError ( path string , err error ) error {
if s3Err , ok := err . ( * s3 . Error ) ; ok && s3Err . Code == "NoSuchKey" {
return storagedriver . PathNotFoundError { Path : path }
2014-10-24 23:37:25 +00:00
}
2014-12-19 17:16:51 +00:00
return err
2014-10-24 23:37:25 +00:00
}
func hasCode ( err error , code string ) bool {
s3err , ok := err . ( * aws . Error )
return ok && s3err . Code == code
}
2015-02-04 00:54:52 +00:00
func ( d * driver ) getOptions ( ) s3 . Options {
2016-01-28 23:48:49 +00:00
return s3 . Options {
SSE : d . Encrypt ,
StorageClass : d . StorageClass ,
}
2014-10-24 23:37:25 +00:00
}
2014-10-26 17:00:53 +00:00
func getPermissions ( ) s3 . ACL {
2014-10-24 23:37:25 +00:00
return s3 . Private
}
2015-02-04 00:54:52 +00:00
func ( d * driver ) getContentType ( ) string {
2014-10-24 23:37:25 +00:00
return "application/octet-stream"
}
2015-04-22 22:07:18 +00:00
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
func ( d * driver ) getbuf ( ) [ ] byte {
return d . pool . Get ( ) . ( [ ] byte )
}
func ( d * driver ) putbuf ( p [ ] byte ) {
copy ( p , d . zeros )
d . pool . Put ( p )
}