distribution/registry/storage/driver/s3/s3.go
Stephen J Day af0c2625e0 Allow configuration of chunksize parameter
The code using values from the yaml package wasn't careful enought with the
possible incoming types. Turns out, it is just an int but we've made this
section somewhat bulletproof in case that package changes the behavior.

This code likely never worked. The configuration system should be decoupled
from the object instantiation.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-05-06 11:04:50 -07:00

741 lines
19 KiB
Go

// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
// This package leverages the AdRoll/goamz client library for interfacing with
// s3.
//
// Because s3 is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Keep in mind that s3 guarantees only eventual consistency, so do not assume
// that a successful write will mean immediate access to the data written (although
// in most regions a new object put has guaranteed read after write). The only true
// guarantee is that once you call Stat and receive a certain file size, that much of
// the file is already accessible.
package s3
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"strings"
"time"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
)
const driverName = "s3"
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Region aws.Region
Encrypt bool
Secure bool
V4Auth bool
ChunkSize int64
RootDirectory string
}
func init() {
factory.Register(driverName, &s3DriverFactory{})
}
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct{}
func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
S3 *s3.S3
Bucket *s3.Bucket
ChunkSize int64
Encrypt bool
RootDirectory string
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
accessKey, ok := parameters["accesskey"]
if !ok {
accessKey = ""
}
secretKey, ok := parameters["secretkey"]
if !ok {
secretKey = ""
}
regionName, ok := parameters["region"]
if !ok || fmt.Sprint(regionName) == "" {
return nil, fmt.Errorf("No region parameter provided")
}
region := aws.GetRegion(fmt.Sprint(regionName))
if region.Name == "" {
return nil, fmt.Errorf("Invalid region provided: %v", region)
}
bucket, ok := parameters["bucket"]
if !ok || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("No bucket parameter provided")
}
encryptBool := false
encrypt, ok := parameters["encrypt"]
if ok {
encryptBool, ok = encrypt.(bool)
if !ok {
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
}
}
secureBool := true
secure, ok := parameters["secure"]
if ok {
secureBool, ok = secure.(bool)
if !ok {
return nil, fmt.Errorf("The secure parameter should be a boolean")
}
}
v4AuthBool := false
v4Auth, ok := parameters["v4auth"]
if ok {
v4AuthBool, ok = v4Auth.(bool)
if !ok {
return nil, fmt.Errorf("The v4auth parameter should be a boolean")
}
}
chunkSize := int64(defaultChunkSize)
chunkSizeParam, ok := parameters["chunksize"]
if ok {
switch v := chunkSizeParam.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
}
chunkSize = vv
case int64:
chunkSize = v
case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
default:
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
}
if chunkSize <= minChunkSize {
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
}
}
rootDirectory, ok := parameters["rootdirectory"]
if !ok {
rootDirectory = ""
}
params := DriverParameters{
fmt.Sprint(accessKey),
fmt.Sprint(secretKey),
fmt.Sprint(bucket),
region,
encryptBool,
secureBool,
v4AuthBool,
chunkSize,
fmt.Sprint(rootDirectory),
}
return New(params)
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New(params DriverParameters) (*Driver, error) {
auth, err := aws.GetAuth(params.AccessKey, params.SecretKey, "", time.Time{})
if err != nil {
return nil, err
}
if !params.Secure {
params.Region.S3Endpoint = strings.Replace(params.Region.S3Endpoint, "https", "http", 1)
}
s3obj := s3.New(auth, params.Region)
bucket := s3obj.Bucket(params.Bucket)
if params.V4Auth {
s3obj.Signature = aws.V4Signature
} else {
if params.Region.Name == "eu-central-1" {
return nil, fmt.Errorf("The eu-central-1 region only works with v4 authentication")
}
}
// Validate that the given credentials have at least read permissions in the
// given bucket scope.
if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
return nil, err
}
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
d := &driver{
S3: s3obj,
Bucket: bucket,
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
RootDirectory: params.RootDirectory,
}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: d,
},
},
}, nil
}
// Implement the storagedriver.StorageDriver interface
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
content, err := d.Bucket.Get(d.s3Path(path))
if err != nil {
return nil, parseError(path, err)
}
return content, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(path string, contents []byte) error {
return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
resp, err := d.Bucket.GetResponseWithHeaders(d.s3Path(path), headers)
if err != nil {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "InvalidRange" {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
return nil, parseError(path, err)
}
return resp.Body, nil
}
// WriteStream stores the contents of the provided io.Reader at a
// location designated by the given path. The driver will know it has
// received the full contents when the reader returns io.EOF. The number
// of successfully READ bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) {
partNumber := 1
bytesRead := 0
var putErrChan chan error
parts := []s3.Part{}
var part s3.Part
multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions())
if err != nil {
return 0, err
}
buf := make([]byte, d.ChunkSize)
zeroBuf := make([]byte, d.ChunkSize)
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
// the stat call.
//
// Note that if the machine dies before executing the defer, we will be left with a dangling
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func() {
if putErrChan != nil {
if putErr := <-putErrChan; putErr != nil {
err = putErr
}
}
if len(parts) > 0 {
if multi == nil {
// Parts should be empty if the multi is not initialized
panic("Unreachable")
} else {
if multi.Complete(parts) != nil {
multi.Abort()
}
}
}
}()
// Fills from 0 to total from current
fromSmallCurrent := func(total int64) error {
current, err := d.ReadStream(path, 0)
if err != nil {
return err
}
bytesRead = 0
for int64(bytesRead) < total {
//The loop should very rarely enter a second iteration
nn, err := current.Read(buf[bytesRead:total])
bytesRead += nn
if err != nil {
if err != io.EOF {
return err
}
break
}
}
return nil
}
// Fills from parameter to chunkSize from reader
fromReader := func(from int64) error {
bytesRead = 0
for from+int64(bytesRead) < d.ChunkSize {
nn, err := reader.Read(buf[from+int64(bytesRead):])
totalRead += int64(nn)
bytesRead += nn
if err != nil {
if err != io.EOF {
return err
}
break
}
}
if putErrChan == nil {
putErrChan = make(chan error)
} else {
if putErr := <-putErrChan; putErr != nil {
putErrChan = nil
return putErr
}
}
go func(bytesRead int, from int64, buf []byte) {
// parts and partNumber are safe, because this function is the only one modifying them and we
// force it to be executed serially.
if bytesRead > 0 {
part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if putErr != nil {
putErrChan <- putErr
}
parts = append(parts, part)
partNumber++
}
putErrChan <- nil
}(bytesRead, from, buf)
buf = make([]byte, d.ChunkSize)
return nil
}
if offset > 0 {
resp, err := d.Bucket.Head(d.s3Path(path), nil)
if err != nil {
if s3Err, ok := err.(*s3.Error); !ok || s3Err.Code != "NoSuchKey" {
return 0, err
}
}
currentLength := int64(0)
if err == nil {
currentLength = resp.ContentLength
}
if currentLength >= offset {
if offset < d.ChunkSize {
// chunkSize > currentLength >= offset
if err = fromSmallCurrent(offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < d.ChunkSize {
return totalRead, nil
}
} else {
// currentLength >= offset >= chunkSize
_, part, err = multi.PutPartCopy(partNumber,
s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
d.Bucket.Name+"/"+d.s3Path(path))
if err != nil {
return 0, err
}
parts = append(parts, part)
partNumber++
}
} else {
// Fills between parameters with 0s but only when to - from <= chunkSize
fromZeroFillSmall := func(from, to int64) error {
bytesRead = 0
for from+int64(bytesRead) < to {
nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
bytesRead += nn
if err != nil {
return err
}
}
return nil
}
// Fills between parameters with 0s, making new parts
fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0)
for to-(from+bytesRead64) >= d.ChunkSize {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
if err != nil {
return err
}
bytesRead64 += d.ChunkSize
parts = append(parts, part)
partNumber++
}
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
}
// currentLength < offset
if currentLength < d.ChunkSize {
if offset < d.ChunkSize {
// chunkSize > offset > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < d.ChunkSize {
return totalRead, nil
}
} else {
// offset >= chunkSize > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
return totalRead, err
}
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
if err != nil {
return totalRead, err
}
parts = append(parts, part)
partNumber++
//Zero fill from chunkSize up to offset, then some reader
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset % d.ChunkSize); err != nil {
return totalRead, err
}
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
return totalRead, nil
}
}
} else {
// offset > currentLength >= chunkSize
_, part, err = multi.PutPartCopy(partNumber,
s3.CopyOptions{},
d.Bucket.Name+"/"+d.s3Path(path))
if err != nil {
return 0, err
}
parts = append(parts, part)
partNumber++
//Zero fill from currentLength up to offset, then some reader
if err = fromZeroFillLarge(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
return totalRead, err
}
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
return totalRead, nil
}
}
}
}
for {
if err = fromReader(0); err != nil {
return totalRead, err
}
if int64(bytesRead) < d.ChunkSize {
break
}
}
return totalRead, nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", 1)
if err != nil {
return nil, err
}
fi := storagedriver.FileInfoFields{
Path: path,
}
if len(listResponse.Contents) == 1 {
if listResponse.Contents[0].Key != d.s3Path(path) {
fi.IsDir = true
} else {
fi.IsDir = false
fi.Size = listResponse.Contents[0].Size
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
if err != nil {
return nil, err
}
fi.ModTime = timestamp
}
} else if len(listResponse.CommonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(path string) ([]string, error) {
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
listResponse, err := d.Bucket.List(d.s3Path(path), "/", "", listMax)
if err != nil {
return nil, err
}
files := []string{}
directories := []string{}
for {
for _, key := range listResponse.Contents {
files = append(files, strings.Replace(key.Key, d.s3Path(""), prefix, 1))
}
for _, commonPrefix := range listResponse.CommonPrefixes {
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1))
}
if listResponse.IsTruncated {
listResponse, err = d.Bucket.List(d.s3Path(path), "/", listResponse.NextMarker, listMax)
if err != nil {
return nil, err
}
} else {
break
}
}
return append(files, directories...), nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(d.s3Path(destPath), getPermissions(),
s3.CopyOptions{Options: d.getOptions(), ContentType: d.getContentType()}, d.Bucket.Name+"/"+d.s3Path(sourcePath))
if err != nil {
return parseError(sourcePath, err)
}
return d.Delete(sourcePath)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(path string) error {
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
s3Objects := make([]s3.Object, listMax)
for len(listResponse.Contents) > 0 {
for index, key := range listResponse.Contents {
s3Objects[index].Key = key.Key
}
err := d.Bucket.DelMulti(s3.Delete{Quiet: false, Objects: s3Objects[0:len(listResponse.Contents)]})
if err != nil {
return nil
}
listResponse, err = d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil {
return err
}
}
return nil
}
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
methodString := "GET"
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != "GET" && methodString != "HEAD") {
return "", storagedriver.ErrUnsupportedMethod
}
}
expiresTime := time.Now().Add(20 * time.Minute)
expires, ok := options["expiry"]
if ok {
et, ok := expires.(time.Time)
if ok {
expiresTime = et
}
}
return d.Bucket.SignedURLWithMethod(methodString, d.s3Path(path), expiresTime, nil, nil), nil
}
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).s3Path(path)
}
func parseError(path string, err error) error {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}
}
return err
}
func hasCode(err error, code string) bool {
s3err, ok := err.(*aws.Error)
return ok && s3err.Code == code
}
func (d *driver) getOptions() s3.Options {
return s3.Options{SSE: d.Encrypt}
}
func getPermissions() s3.ACL {
return s3.Private
}
func (d *driver) getContentType() string {
return "application/octet-stream"
}