forked from TrueCloudLab/rclone
s3: simplify PutObject code to use the Request.SetStreamingBody method
In this commit
e5974ac4b0
s3: use PutObject from the aws SDK to upload single part objects
rclone was made to upload objects to s3 using PUT requests rather than
using signed uploads.
However this change missed the fact that there is a supported way to
do this in the SDK using the SetStreamingBody method on the Request.
This therefore reverts a lot of the previous commit to do with making
an unsigned connection and other complication and uses the SDK
facility.
This commit is contained in:
parent
e59801c69b
commit
c85fbebce6
1 changed files with 31 additions and 74 deletions
105
backend/s3/s3.go
105
backend/s3/s3.go
|
@ -12,6 +12,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
|
@ -32,7 +33,6 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/endpoints"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/ncw/swift/v2"
|
||||
"github.com/rclone/rclone/fs"
|
||||
|
@ -2044,7 +2044,6 @@ type Fs struct {
|
|||
ctx context.Context // global context for reading config
|
||||
features *fs.Features // optional features
|
||||
c *s3.S3 // the connection to the s3 server
|
||||
cu *s3.S3 // unsigned connection to the s3 server for PutObject
|
||||
ses *session.Session // the s3 session
|
||||
rootBucket string // bucket part of root (if any)
|
||||
rootDirectory string // directory part of root (if any)
|
||||
|
@ -2181,11 +2180,7 @@ func getClient(ctx context.Context, opt *Options) *http.Client {
|
|||
}
|
||||
|
||||
// s3Connection makes a connection to s3
|
||||
//
|
||||
// If unsignedBody is set then the connection is configured for
|
||||
// unsigned bodies which is necessary for PutObject if we don't want
|
||||
// it to seek
|
||||
func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *s3.S3, *session.Session, error) {
|
||||
func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *session.Session, error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
// Make the auth
|
||||
v := credentials.Value{
|
||||
|
@ -2202,7 +2197,7 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
|||
// start a new AWS session
|
||||
awsSession, err := session.NewSession()
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("NewSession: %w", err)
|
||||
return nil, nil, fmt.Errorf("NewSession: %w", err)
|
||||
}
|
||||
|
||||
// first provider to supply a credential set "wins"
|
||||
|
@ -2242,9 +2237,9 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
|||
// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
|
||||
cred = credentials.AnonymousCredentials
|
||||
case v.AccessKeyID == "":
|
||||
return nil, nil, nil, errors.New("access_key_id not found")
|
||||
return nil, nil, errors.New("access_key_id not found")
|
||||
case v.SecretAccessKey == "":
|
||||
return nil, nil, nil, errors.New("secret_access_key not found")
|
||||
return nil, nil, errors.New("secret_access_key not found")
|
||||
}
|
||||
|
||||
if opt.Region == "" {
|
||||
|
@ -2283,36 +2278,25 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
|||
// (from the shared config file) if the passed-in Options.Config.Credentials is nil.
|
||||
awsSessionOpts.Config.Credentials = nil
|
||||
}
|
||||
// Setting this stops PutObject reading the body twice and seeking
|
||||
// We add our own Content-MD5 for data protection
|
||||
awsSessionOpts.Config.S3DisableContentMD5Validation = aws.Bool(true)
|
||||
ses, err := session.NewSessionWithOptions(awsSessionOpts)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
newC := func(unsignedBody bool) *s3.S3 {
|
||||
c := s3.New(ses)
|
||||
if opt.V2Auth || opt.Region == "other-v2-signature" {
|
||||
fs.Debugf(nil, "Using v2 auth")
|
||||
signer := func(req *request.Request) {
|
||||
// Ignore AnonymousCredentials object
|
||||
if req.Config.Credentials == credentials.AnonymousCredentials {
|
||||
return
|
||||
}
|
||||
sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest)
|
||||
c := s3.New(ses)
|
||||
if opt.V2Auth || opt.Region == "other-v2-signature" {
|
||||
fs.Debugf(nil, "Using v2 auth")
|
||||
signer := func(req *request.Request) {
|
||||
// Ignore AnonymousCredentials object
|
||||
if req.Config.Credentials == credentials.AnonymousCredentials {
|
||||
return
|
||||
}
|
||||
c.Handlers.Sign.Clear()
|
||||
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
||||
c.Handlers.Sign.PushBack(signer)
|
||||
} else if unsignedBody {
|
||||
// If the body is unsigned then tell the signer that we aren't signing the payload
|
||||
c.Handlers.Sign.Clear()
|
||||
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
||||
c.Handlers.Sign.PushBackNamed(v4.BuildNamedHandler("v4.SignRequestHandler.WithUnsignedPayload", v4.WithUnsignedPayload))
|
||||
sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest)
|
||||
}
|
||||
return c
|
||||
c.Handlers.Sign.Clear()
|
||||
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
||||
c.Handlers.Sign.PushBack(signer)
|
||||
}
|
||||
return newC(false), newC(true), ses, nil
|
||||
return c, ses, nil
|
||||
}
|
||||
|
||||
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
|
@ -2501,7 +2485,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
opt.SSECustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
|
||||
}
|
||||
srv := getClient(ctx, opt)
|
||||
c, cu, ses, err := s3Connection(ctx, opt, srv)
|
||||
c, ses, err := s3Connection(ctx, opt, srv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -2519,7 +2503,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
ci: ci,
|
||||
ctx: ctx,
|
||||
c: c,
|
||||
cu: cu,
|
||||
ses: ses,
|
||||
pacer: pc,
|
||||
cache: bucket.NewCache(),
|
||||
|
@ -2643,12 +2626,11 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
|
|||
// Make a new session with the new region
|
||||
oldRegion := f.opt.Region
|
||||
f.opt.Region = region
|
||||
c, cu, ses, err := s3Connection(f.ctx, &f.opt, f.srv)
|
||||
c, ses, err := s3Connection(f.ctx, &f.opt, f.srv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating new session failed: %w", err)
|
||||
}
|
||||
f.c = c
|
||||
f.cu = cu
|
||||
f.ses = ses
|
||||
|
||||
fs.Logf(f, "Switched region to %q from %q", region, oldRegion)
|
||||
|
@ -4143,48 +4125,23 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return etag, nil
|
||||
}
|
||||
|
||||
// unWrapAwsError unwraps AWS errors, looking for a non AWS error
|
||||
//
|
||||
// It returns true if one was found and the error, or false and the
|
||||
// error passed in.
|
||||
func unWrapAwsError(err error) (found bool, outErr error) {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
var origErrs []error
|
||||
if batchErr, ok := awsErr.(awserr.BatchError); ok {
|
||||
origErrs = batchErr.OrigErrs()
|
||||
} else {
|
||||
origErrs = []error{awsErr.OrigErr()}
|
||||
}
|
||||
for _, origErr := range origErrs {
|
||||
found, newErr := unWrapAwsError(origErr)
|
||||
if found {
|
||||
return found, newErr
|
||||
}
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Upload a single part using PutObject
|
||||
func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) {
|
||||
req.Body = readers.NewFakeSeeker(in, size)
|
||||
var resp *s3.PutObjectOutput
|
||||
r, resp := o.fs.c.PutObjectRequest(req)
|
||||
if req.ContentLength != nil && *req.ContentLength == 0 {
|
||||
// Can't upload zero length files like this for some reason
|
||||
r.Body = bytes.NewReader([]byte{})
|
||||
} else {
|
||||
r.SetStreamingBody(ioutil.NopCloser(in))
|
||||
}
|
||||
r.SetContext(ctx)
|
||||
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
|
||||
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = o.fs.cu.PutObject(req)
|
||||
err := r.Send()
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
// Return the underlying error if we have a Serialization error if possible
|
||||
//
|
||||
// Serialization errors are synthesized locally in the SDK (not returned from the
|
||||
// server). We'll get one if the SDK attempts a retry, however the FakeSeeker will
|
||||
// remember the previous error from Read and return that.
|
||||
if do, ok := err.(awserr.Error); ok && do.Code() == request.ErrCodeSerialization {
|
||||
if found, newErr := unWrapAwsError(err); found {
|
||||
err = newErr
|
||||
}
|
||||
}
|
||||
return etag, lastModified, err
|
||||
}
|
||||
lastModified = time.Now()
|
||||
|
|
Loading…
Reference in a new issue