d0bc83d8e4
both oss and gcs driver were missing the context parameter that is required to satisfy the storagedriver.FileWriter interface. Signed-off-by: Flavian Missi <fmissi@redhat.com>
714 lines
19 KiB
Go
714 lines
19 KiB
Go
// Package oss provides a storagedriver.StorageDriver implementation to
|
|
// store blobs in Aliyun OSS cloud storage.
|
|
//
|
|
// This package leverages the denverdino/aliyungo client library for interfacing with
|
|
// oss.
|
|
//
|
|
// Because OSS is a key, value store the Stat call does not support last modification
|
|
// time for directories (directories are an abstraction for key, value stores)
|
|
//
|
|
//go:build include_oss
|
|
// +build include_oss
|
|
|
|
package oss
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/denverdino/aliyungo/oss"
|
|
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
|
"github.com/distribution/distribution/v3/registry/storage/driver/base"
|
|
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const driverName = "oss"
|
|
|
|
// minChunkSize defines the minimum multipart upload chunk size
|
|
// OSS API requires multipart upload chunks to be at least 5MB
|
|
const minChunkSize = 5 << 20
|
|
|
|
const (
|
|
defaultChunkSize = 2 * minChunkSize
|
|
defaultTimeout = 2 * time.Minute // 2 minute timeout per chunk
|
|
)
|
|
|
|
// listMax is the largest amount of objects you can request from OSS in a list call
|
|
const listMax = 1000
|
|
|
|
// DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
|
|
type DriverParameters struct {
|
|
AccessKeyID string
|
|
AccessKeySecret string
|
|
Bucket string
|
|
Region oss.Region
|
|
Internal bool
|
|
Encrypt bool
|
|
Secure bool
|
|
ChunkSize int64
|
|
RootDirectory string
|
|
Endpoint string
|
|
EncryptionKeyID string
|
|
}
|
|
|
|
func init() {
|
|
factory.Register(driverName, &ossDriverFactory{})
|
|
}
|
|
|
|
// ossDriverFactory implements the factory.StorageDriverFactory interface
|
|
type ossDriverFactory struct{}
|
|
|
|
func (factory *ossDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
|
return FromParameters(parameters)
|
|
}
|
|
|
|
type driver struct {
|
|
Client *oss.Client
|
|
Bucket *oss.Bucket
|
|
ChunkSize int64
|
|
Encrypt bool
|
|
RootDirectory string
|
|
EncryptionKeyID string
|
|
}
|
|
|
|
type baseEmbed struct {
|
|
base.Base
|
|
}
|
|
|
|
// Driver is a storagedriver.StorageDriver implementation backed by Aliyun OSS
|
|
// Objects are stored at absolute keys in the provided bucket.
|
|
type Driver struct {
|
|
baseEmbed
|
|
}
|
|
|
|
// FromParameters constructs a new Driver with a given parameters map
|
|
// Required parameters:
|
|
// - accesskey
|
|
// - secretkey
|
|
// - region
|
|
// - bucket
|
|
// - encrypt
|
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
|
// Providing no values for these is valid in case the user is authenticating
|
|
|
|
accessKey, ok := parameters["accesskeyid"]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No accesskeyid parameter provided")
|
|
}
|
|
secretKey, ok := parameters["accesskeysecret"]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No accesskeysecret parameter provided")
|
|
}
|
|
|
|
regionName, ok := parameters["region"]
|
|
if !ok || fmt.Sprint(regionName) == "" {
|
|
return nil, fmt.Errorf("No region parameter provided")
|
|
}
|
|
|
|
bucket, ok := parameters["bucket"]
|
|
if !ok || fmt.Sprint(bucket) == "" {
|
|
return nil, fmt.Errorf("No bucket parameter provided")
|
|
}
|
|
|
|
internalBool := false
|
|
internal, ok := parameters["internal"]
|
|
if ok {
|
|
internalBool, ok = internal.(bool)
|
|
if !ok {
|
|
return nil, fmt.Errorf("The internal parameter should be a boolean")
|
|
}
|
|
}
|
|
|
|
encryptBool := false
|
|
encrypt, ok := parameters["encrypt"]
|
|
if ok {
|
|
encryptBool, ok = encrypt.(bool)
|
|
if !ok {
|
|
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
|
|
}
|
|
}
|
|
|
|
encryptionKeyID, ok := parameters["encryptionkeyid"]
|
|
if !ok {
|
|
encryptionKeyID = ""
|
|
}
|
|
|
|
secureBool := true
|
|
secure, ok := parameters["secure"]
|
|
if ok {
|
|
secureBool, ok = secure.(bool)
|
|
if !ok {
|
|
return nil, fmt.Errorf("The secure parameter should be a boolean")
|
|
}
|
|
}
|
|
|
|
chunkSize := int64(defaultChunkSize)
|
|
chunkSizeParam, ok := parameters["chunksize"]
|
|
if ok {
|
|
switch v := chunkSizeParam.(type) {
|
|
case string:
|
|
vv, err := strconv.ParseInt(v, 0, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
|
|
}
|
|
chunkSize = vv
|
|
case int64:
|
|
chunkSize = v
|
|
case int, uint, int32, uint32, uint64:
|
|
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
|
|
default:
|
|
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
|
|
}
|
|
|
|
if chunkSize < minChunkSize {
|
|
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
|
|
}
|
|
}
|
|
|
|
rootDirectory, ok := parameters["rootdirectory"]
|
|
if !ok {
|
|
rootDirectory = ""
|
|
}
|
|
|
|
endpoint, ok := parameters["endpoint"]
|
|
if !ok {
|
|
endpoint = ""
|
|
}
|
|
|
|
params := DriverParameters{
|
|
AccessKeyID: fmt.Sprint(accessKey),
|
|
AccessKeySecret: fmt.Sprint(secretKey),
|
|
Bucket: fmt.Sprint(bucket),
|
|
Region: oss.Region(fmt.Sprint(regionName)),
|
|
ChunkSize: chunkSize,
|
|
RootDirectory: fmt.Sprint(rootDirectory),
|
|
Encrypt: encryptBool,
|
|
Secure: secureBool,
|
|
Internal: internalBool,
|
|
Endpoint: fmt.Sprint(endpoint),
|
|
EncryptionKeyID: fmt.Sprint(encryptionKeyID),
|
|
}
|
|
|
|
return New(params)
|
|
}
|
|
|
|
// New constructs a new Driver with the given Aliyun credentials, region, encryption flag, and
|
|
// bucketName
|
|
func New(params DriverParameters) (*Driver, error) {
|
|
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
|
|
client.SetEndpoint(params.Endpoint)
|
|
bucket := client.Bucket(params.Bucket)
|
|
client.SetDebug(false)
|
|
|
|
// Validate that the given credentials have at least read permissions in the
|
|
// given bucket scope.
|
|
if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(tg123): Currently multipart uploads have no timestamps, so this would be unwise
|
|
// if you initiated a new OSS client while another one is running on the same bucket.
|
|
|
|
d := &driver{
|
|
Client: client,
|
|
Bucket: bucket,
|
|
ChunkSize: params.ChunkSize,
|
|
Encrypt: params.Encrypt,
|
|
RootDirectory: params.RootDirectory,
|
|
EncryptionKeyID: params.EncryptionKeyID,
|
|
}
|
|
|
|
return &Driver{
|
|
baseEmbed: baseEmbed{
|
|
Base: base.Base{
|
|
StorageDriver: d,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface
|
|
|
|
func (d *driver) Name() string {
|
|
return driverName
|
|
}
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
|
content, err := d.Bucket.Get(d.ossPath(path))
|
|
if err != nil {
|
|
return nil, parseError(path, err)
|
|
}
|
|
return content, nil
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
|
return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
|
|
}
|
|
|
|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
|
headers := make(http.Header)
|
|
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
|
|
|
|
resp, err := d.Bucket.GetResponseWithHeaders(d.ossPath(path), headers)
|
|
if err != nil {
|
|
return nil, parseError(path, err)
|
|
}
|
|
|
|
// Due to Aliyun OSS API, status 200 and whole object will be return instead of an
|
|
// InvalidRange error when range is invalid.
|
|
//
|
|
// OSS sever will always return http.StatusPartialContent if range is acceptable.
|
|
if resp.StatusCode != http.StatusPartialContent {
|
|
resp.Body.Close()
|
|
return io.NopCloser(bytes.NewReader(nil)), nil
|
|
}
|
|
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// Writer returns a FileWriter which will store the content written to it
|
|
// at the location designated by "path" after the call to Commit.
|
|
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
|
key := d.ossPath(path)
|
|
if !append {
|
|
// TODO (brianbland): cancel other uploads at this path
|
|
multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return d.newWriter(key, multi, nil), nil
|
|
}
|
|
multis, _, err := d.Bucket.ListMulti(key, "")
|
|
if err != nil {
|
|
return nil, parseError(path, err)
|
|
}
|
|
for _, multi := range multis {
|
|
if key != multi.Key {
|
|
continue
|
|
}
|
|
parts, err := multi.ListParts()
|
|
if err != nil {
|
|
return nil, parseError(path, err)
|
|
}
|
|
var multiSize int64
|
|
for _, part := range parts {
|
|
multiSize += part.Size
|
|
}
|
|
return d.newWriter(key, multi, parts), nil
|
|
}
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
// Stat retrieves the FileInfo for the given path, including the current size
|
|
// in bytes and the creation time.
|
|
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
|
listResponse, err := d.Bucket.List(d.ossPath(path), "", "", 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fi := storagedriver.FileInfoFields{
|
|
Path: path,
|
|
}
|
|
|
|
if len(listResponse.Contents) == 1 {
|
|
if listResponse.Contents[0].Key != d.ossPath(path) {
|
|
fi.IsDir = true
|
|
} else {
|
|
fi.IsDir = false
|
|
fi.Size = listResponse.Contents[0].Size
|
|
|
|
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fi.ModTime = timestamp
|
|
}
|
|
} else if len(listResponse.CommonPrefixes) == 1 {
|
|
fi.IsDir = true
|
|
} else {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given path.
|
|
func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
|
|
path := opath
|
|
if path != "/" && opath[len(path)-1] != '/' {
|
|
path = path + "/"
|
|
}
|
|
|
|
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
|
|
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
|
|
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
|
|
prefix := ""
|
|
if d.ossPath("") == "" {
|
|
prefix = "/"
|
|
}
|
|
|
|
ossPath := d.ossPath(path)
|
|
listResponse, err := d.Bucket.List(ossPath, "/", "", listMax)
|
|
if err != nil {
|
|
return nil, parseError(opath, err)
|
|
}
|
|
|
|
files := []string{}
|
|
directories := []string{}
|
|
|
|
for {
|
|
for _, key := range listResponse.Contents {
|
|
files = append(files, strings.Replace(key.Key, d.ossPath(""), prefix, 1))
|
|
}
|
|
|
|
for _, commonPrefix := range listResponse.CommonPrefixes {
|
|
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.ossPath(""), prefix, 1))
|
|
}
|
|
|
|
if listResponse.IsTruncated {
|
|
listResponse, err = d.Bucket.List(ossPath, "/", listResponse.NextMarker, listMax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
// This is to cover for the cases when the first key equal to ossPath.
|
|
if len(files) > 0 && files[0] == strings.Replace(ossPath, d.ossPath(""), prefix, 1) {
|
|
files = files[1:]
|
|
}
|
|
|
|
if opath != "/" {
|
|
if len(files) == 0 && len(directories) == 0 {
|
|
// Treat empty response as missing directory, since we don't actually
|
|
// have directories in s3.
|
|
return nil, storagedriver.PathNotFoundError{Path: opath}
|
|
}
|
|
}
|
|
|
|
return append(files, directories...), nil
|
|
}
|
|
|
|
const maxConcurrency = 10
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
|
logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath))
|
|
err := d.Bucket.CopyLargeFileInParallel(d.ossPath(sourcePath), d.ossPath(destPath),
|
|
d.getContentType(),
|
|
getPermissions(),
|
|
d.getOptions(),
|
|
maxConcurrency)
|
|
if err != nil {
|
|
logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err)
|
|
return parseError(sourcePath, err)
|
|
}
|
|
|
|
return d.Delete(ctx, sourcePath)
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
|
ossPath := d.ossPath(path)
|
|
listResponse, err := d.Bucket.List(ossPath, "", "", listMax)
|
|
if err != nil || len(listResponse.Contents) == 0 {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
ossObjects := make([]oss.Object, listMax)
|
|
|
|
for len(listResponse.Contents) > 0 {
|
|
numOssObjects := len(listResponse.Contents)
|
|
for index, key := range listResponse.Contents {
|
|
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
|
|
if len(key.Key) > len(ossPath) && (key.Key)[len(ossPath)] != '/' {
|
|
numOssObjects = index
|
|
break
|
|
}
|
|
ossObjects[index].Key = key.Key
|
|
}
|
|
|
|
err := d.Bucket.DelMulti(oss.Delete{Quiet: false, Objects: ossObjects[0:numOssObjects]})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if numOssObjects < len(listResponse.Contents) {
|
|
return nil
|
|
}
|
|
|
|
listResponse, err = d.Bucket.List(d.ossPath(path), "", "", listMax)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
|
|
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
|
|
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
|
methodString := http.MethodGet
|
|
method, ok := options["method"]
|
|
if ok {
|
|
methodString, ok = method.(string)
|
|
if !ok || (methodString != http.MethodGet) {
|
|
return "", storagedriver.ErrUnsupportedMethod{}
|
|
}
|
|
}
|
|
|
|
expiresTime := time.Now().Add(20 * time.Minute)
|
|
|
|
expires, ok := options["expiry"]
|
|
if ok {
|
|
et, ok := expires.(time.Time)
|
|
if ok {
|
|
expiresTime = et
|
|
}
|
|
}
|
|
logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
|
|
signedURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
|
|
logrus.Infof("signed URL: %s", signedURL)
|
|
return signedURL, nil
|
|
}
|
|
|
|
// Walk traverses a filesystem defined within driver, starting
|
|
// from the given path, calling f on each file
|
|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
|
return storagedriver.WalkFallback(ctx, d, path, f)
|
|
}
|
|
|
|
func (d *driver) ossPath(path string) string {
|
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
|
}
|
|
|
|
func parseError(path string, err error) error {
|
|
if ossErr, ok := err.(*oss.Error); ok && ossErr.StatusCode == http.StatusNotFound && (ossErr.Code == "NoSuchKey" || ossErr.Code == "") {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func hasCode(err error, code string) bool {
|
|
ossErr, ok := err.(*oss.Error)
|
|
return ok && ossErr.Code == code
|
|
}
|
|
|
|
func (d *driver) getOptions() oss.Options {
|
|
return oss.Options{
|
|
ServerSideEncryption: d.Encrypt,
|
|
ServerSideEncryptionKeyID: d.EncryptionKeyID,
|
|
}
|
|
}
|
|
|
|
func (d *driver) getCopyOptions() oss.CopyOptions {
|
|
return oss.CopyOptions{
|
|
ServerSideEncryption: d.Encrypt,
|
|
ServerSideEncryptionKeyID: d.EncryptionKeyID,
|
|
}
|
|
}
|
|
|
|
func getPermissions() oss.ACL {
|
|
return oss.Private
|
|
}
|
|
|
|
func (d *driver) getContentType() string {
|
|
return "application/octet-stream"
|
|
}
|
|
|
|
// writer attempts to upload parts to S3 in a buffered fashion where the last
|
|
// part is at least as large as the chunksize, so the multipart upload could be
|
|
// cleanly resumed in the future. This is violated if Close is called after less
|
|
// than a full chunk is written.
|
|
type writer struct {
|
|
driver *driver
|
|
key string
|
|
multi *oss.Multi
|
|
parts []oss.Part
|
|
size int64
|
|
readyPart []byte
|
|
pendingPart []byte
|
|
closed bool
|
|
committed bool
|
|
cancelled bool
|
|
}
|
|
|
|
func (d *driver) newWriter(key string, multi *oss.Multi, parts []oss.Part) storagedriver.FileWriter {
|
|
var size int64
|
|
for _, part := range parts {
|
|
size += part.Size
|
|
}
|
|
return &writer{
|
|
driver: d,
|
|
key: key,
|
|
multi: multi,
|
|
parts: parts,
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
func (w *writer) Write(p []byte) (int, error) {
|
|
if w.closed {
|
|
return 0, fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return 0, fmt.Errorf("already committed")
|
|
} else if w.cancelled {
|
|
return 0, fmt.Errorf("already cancelled")
|
|
}
|
|
|
|
// If the last written part is smaller than minChunkSize, we need to make a
|
|
// new multipart upload :sadface:
|
|
if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize {
|
|
err := w.multi.Complete(w.parts)
|
|
if err != nil {
|
|
w.multi.Abort()
|
|
return 0, err
|
|
}
|
|
|
|
multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
w.multi = multi
|
|
|
|
// If the entire written file is smaller than minChunkSize, we need to make
|
|
// a new part from scratch :double sad face:
|
|
if w.size < minChunkSize {
|
|
contents, err := w.driver.Bucket.Get(w.key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
w.parts = nil
|
|
w.readyPart = contents
|
|
} else {
|
|
// Otherwise we can use the old file as the new first part
|
|
_, part, err := multi.PutPartCopy(1, w.driver.getCopyOptions(), w.driver.Bucket.Name+"/"+w.key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
w.parts = []oss.Part{part}
|
|
}
|
|
}
|
|
|
|
var n int
|
|
|
|
for len(p) > 0 {
|
|
// If no parts are ready to write, fill up the first part
|
|
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
|
|
if len(p) >= neededBytes {
|
|
w.readyPart = append(w.readyPart, p[:neededBytes]...)
|
|
n += neededBytes
|
|
p = p[neededBytes:]
|
|
} else {
|
|
w.readyPart = append(w.readyPart, p...)
|
|
n += len(p)
|
|
p = nil
|
|
}
|
|
}
|
|
|
|
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
|
|
if len(p) >= neededBytes {
|
|
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
|
|
n += neededBytes
|
|
p = p[neededBytes:]
|
|
err := w.flushPart()
|
|
if err != nil {
|
|
w.size += int64(n)
|
|
return n, err
|
|
}
|
|
} else {
|
|
w.pendingPart = append(w.pendingPart, p...)
|
|
n += len(p)
|
|
p = nil
|
|
}
|
|
}
|
|
}
|
|
w.size += int64(n)
|
|
return n, nil
|
|
}
|
|
|
|
func (w *writer) Size() int64 {
|
|
return w.size
|
|
}
|
|
|
|
func (w *writer) Close() error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
}
|
|
w.closed = true
|
|
return w.flushPart()
|
|
}
|
|
|
|
func (w *writer) Cancel(ctx context.Context) error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return fmt.Errorf("already committed")
|
|
}
|
|
w.cancelled = true
|
|
err := w.multi.Abort()
|
|
return err
|
|
}
|
|
|
|
func (w *writer) Commit() error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return fmt.Errorf("already committed")
|
|
} else if w.cancelled {
|
|
return fmt.Errorf("already cancelled")
|
|
}
|
|
err := w.flushPart()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.committed = true
|
|
err = w.multi.Complete(w.parts)
|
|
if err != nil {
|
|
w.multi.Abort()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// flushPart flushes buffers to write a part to S3.
|
|
// Only called by Write (with both buffers full) and Close/Commit (always)
|
|
func (w *writer) flushPart() error {
|
|
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
|
|
// nothing to write
|
|
return nil
|
|
}
|
|
if len(w.pendingPart) < int(w.driver.ChunkSize) {
|
|
// closing with a small pending part
|
|
// combine ready and pending to avoid writing a small part
|
|
w.readyPart = append(w.readyPart, w.pendingPart...)
|
|
w.pendingPart = nil
|
|
}
|
|
|
|
part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.parts = append(w.parts, part)
|
|
w.readyPart = w.pendingPart
|
|
w.pendingPart = nil
|
|
return nil
|
|
}
|