From c85fbebce6f7166350c79e11fae763c8264ef865 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 16 Jun 2022 22:55:45 +0100 Subject: [PATCH] s3: simplify PutObject code to use the Request.SetStreamingBody method In this commit e5974ac4b0a847e8 s3: use PutObject from the aws SDK to upload single part objects rclone was made to upload objects to s3 using PUT requests rather than using signed uploads. However this change missed the fact that there is a supported way to do this in the SDK using the SetStreamingBody method on the Request. This therefore reverts a lot of the previous commit to do with making an unsigned connection and other complication and uses the SDK facility. --- backend/s3/s3.go | 105 ++++++++++++++--------------------------------- 1 file changed, 31 insertions(+), 74 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index dd894db18..4e2a15cde 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "path" @@ -32,7 +33,6 @@ import ( "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" - v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/aws/aws-sdk-go/service/s3" "github.com/ncw/swift/v2" "github.com/rclone/rclone/fs" @@ -2044,7 +2044,6 @@ type Fs struct { ctx context.Context // global context for reading config features *fs.Features // optional features c *s3.S3 // the connection to the s3 server - cu *s3.S3 // unsigned connection to the s3 server for PutObject ses *session.Session // the s3 session rootBucket string // bucket part of root (if any) rootDirectory string // directory part of root (if any) @@ -2181,11 +2180,7 @@ func getClient(ctx context.Context, opt *Options) *http.Client { } // s3Connection makes a connection to s3 -// -// If unsignedBody is set then the connection is configured for -// unsigned bodies which is necessary for PutObject if we don't want -// it to seek -func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *s3.S3, *session.Session, error) { +func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *session.Session, error) { ci := fs.GetConfig(ctx) // Make the auth v := credentials.Value{ @@ -2202,7 +2197,7 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // start a new AWS session awsSession, err := session.NewSession() if err != nil { - return nil, nil, nil, fmt.Errorf("NewSession: %w", err) + return nil, nil, fmt.Errorf("NewSession: %w", err) } // first provider to supply a credential set "wins" @@ -2242,9 +2237,9 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // if no access key/secret and iam is explicitly disabled then fall back to anon interaction cred = credentials.AnonymousCredentials case v.AccessKeyID == "": - return nil, nil, nil, errors.New("access_key_id not found") + return nil, nil, errors.New("access_key_id not found") case v.SecretAccessKey == "": - return nil, nil, nil, errors.New("secret_access_key not found") + return nil, nil, errors.New("secret_access_key not found") } if opt.Region == "" { @@ -2283,36 +2278,25 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // (from the shared config file) if the passed-in Options.Config.Credentials is nil. awsSessionOpts.Config.Credentials = nil } - // Setting this stops PutObject reading the body twice and seeking - // We add our own Content-MD5 for data protection - awsSessionOpts.Config.S3DisableContentMD5Validation = aws.Bool(true) ses, err := session.NewSessionWithOptions(awsSessionOpts) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - newC := func(unsignedBody bool) *s3.S3 { - c := s3.New(ses) - if opt.V2Auth || opt.Region == "other-v2-signature" { - fs.Debugf(nil, "Using v2 auth") - signer := func(req *request.Request) { - // Ignore AnonymousCredentials object - if req.Config.Credentials == credentials.AnonymousCredentials { - return - } - sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) + c := s3.New(ses) + if opt.V2Auth || opt.Region == "other-v2-signature" { + fs.Debugf(nil, "Using v2 auth") + signer := func(req *request.Request) { + // Ignore AnonymousCredentials object + if req.Config.Credentials == credentials.AnonymousCredentials { + return } - c.Handlers.Sign.Clear() - c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) - c.Handlers.Sign.PushBack(signer) - } else if unsignedBody { - // If the body is unsigned then tell the signer that we aren't signing the payload - c.Handlers.Sign.Clear() - c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) - c.Handlers.Sign.PushBackNamed(v4.BuildNamedHandler("v4.SignRequestHandler.WithUnsignedPayload", v4.WithUnsignedPayload)) + sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) } - return c + c.Handlers.Sign.Clear() + c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) + c.Handlers.Sign.PushBack(signer) } - return newC(false), newC(true), ses, nil + return c, ses, nil } func checkUploadChunkSize(cs fs.SizeSuffix) error { @@ -2501,7 +2485,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt.SSECustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:]) } srv := getClient(ctx, opt) - c, cu, ses, err := s3Connection(ctx, opt, srv) + c, ses, err := s3Connection(ctx, opt, srv) if err != nil { return nil, err } @@ -2519,7 +2503,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, ctx: ctx, c: c, - cu: cu, ses: ses, pacer: pc, cache: bucket.NewCache(), @@ -2643,12 +2626,11 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { // Make a new session with the new region oldRegion := f.opt.Region f.opt.Region = region - c, cu, ses, err := s3Connection(f.ctx, &f.opt, f.srv) + c, ses, err := s3Connection(f.ctx, &f.opt, f.srv) if err != nil { return fmt.Errorf("creating new session failed: %w", err) } f.c = c - f.cu = cu f.ses = ses fs.Logf(f, "Switched region to %q from %q", region, oldRegion) @@ -4143,48 +4125,23 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return etag, nil } -// unWrapAwsError unwraps AWS errors, looking for a non AWS error -// -// It returns true if one was found and the error, or false and the -// error passed in. -func unWrapAwsError(err error) (found bool, outErr error) { - if awsErr, ok := err.(awserr.Error); ok { - var origErrs []error - if batchErr, ok := awsErr.(awserr.BatchError); ok { - origErrs = batchErr.OrigErrs() - } else { - origErrs = []error{awsErr.OrigErr()} - } - for _, origErr := range origErrs { - found, newErr := unWrapAwsError(origErr) - if found { - return found, newErr - } - } - return false, err - } - return true, err -} - // Upload a single part using PutObject func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { - req.Body = readers.NewFakeSeeker(in, size) - var resp *s3.PutObjectOutput + r, resp := o.fs.c.PutObjectRequest(req) + if req.ContentLength != nil && *req.ContentLength == 0 { + // Can't upload zero length files like this for some reason + r.Body = bytes.NewReader([]byte{}) + } else { + r.SetStreamingBody(ioutil.NopCloser(in)) + } + r.SetContext(ctx) + r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") + err = o.fs.pacer.CallNoRetry(func() (bool, error) { - resp, err = o.fs.cu.PutObject(req) + err := r.Send() return o.fs.shouldRetry(ctx, err) }) if err != nil { - // Return the underlying error if we have a Serialization error if possible - // - // Serialization errors are synthesized locally in the SDK (not returned from the - // server). We'll get one if the SDK attempts a retry, however the FakeSeeker will - // remember the previous error from Read and return that. - if do, ok := err.(awserr.Error); ok && do.Code() == request.ErrCodeSerialization { - if found, newErr := unWrapAwsError(err); found { - err = newErr - } - } return etag, lastModified, err } lastModified = time.Now()