From a59fa2977d7e5b3747bf942c7fc5abe366cbcd72 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 25 Jul 2022 15:56:17 +0100 Subject: [PATCH] s3: factor different listing versions into separate objects --- backend/s3/s3.go | 166 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 112 insertions(+), 54 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 4d196719a..f6fc6e956 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -2750,6 +2750,100 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { return nil } +// Common interface for bucket listers +type bucketLister interface { + List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) + URLEncodeListings(bool) +} + +// V1 bucket lister +type v1List struct { + f *Fs + req s3.ListObjectsInput +} + +// Create a new V1 bucket lister +func (f *Fs) newV1List(req *s3.ListObjectsV2Input) bucketLister { + l := &v1List{ + f: f, + } + // Convert v2 req into v1 req + structs.SetFrom(&l.req, req) + return l +} + +// List a bucket with V1 listing +func (ls *v1List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { + respv1, err := ls.f.c.ListObjectsWithContext(ctx, &ls.req) + if err != nil { + return nil, nil, err + } + + // Set up the request for next time + ls.req.Marker = respv1.NextMarker + if aws.BoolValue(respv1.IsTruncated) && ls.req.Marker == nil { + if len(respv1.Contents) == 0 { + return nil, nil, errors.New("s3 protocol error: received listing v1 with IsTruncated set, no NextMarker and no Contents") + } + // Use the last Key received if no NextMarker and isTruncated + ls.req.Marker = respv1.Contents[len(respv1.Contents)-1].Key + + } + + // If we are URL encoding then must decode the marker + if ls.req.Marker != nil && ls.req.EncodingType != nil { + *ls.req.Marker, err = url.QueryUnescape(*ls.req.Marker) + if err != nil { + return nil, nil, fmt.Errorf("failed to URL decode Marker %q: %w", *ls.req.Marker, err) + } + } + + // convert v1 resp into v2 resp + resp = new(s3.ListObjectsV2Output) + structs.SetFrom(resp, respv1) + + return resp, nil, nil +} + +// URL Encode the listings +func (ls *v1List) URLEncodeListings(encode bool) { + if encode { + ls.req.EncodingType = aws.String(s3.EncodingTypeUrl) + } else { + ls.req.EncodingType = nil + } +} + +// V2 bucket lister +type v2List struct { + f *Fs + req s3.ListObjectsV2Input +} + +// Create a new V2 bucket lister +func (f *Fs) newV2List(req *s3.ListObjectsV2Input) bucketLister { + return &v2List{ + f: f, + req: *req, + } +} + +// Do a V2 listing +func (ls *v2List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { + resp, err = ls.f.c.ListObjectsV2WithContext(ctx, &ls.req) + ls.req.ContinuationToken = resp.NextContinuationToken + return resp, nil, err +} + +// URL Encode the listings +func (ls *v2List) URLEncodeListings(encode bool) { + if encode { + ls.req.EncodingType = aws.String(s3.EncodingTypeUrl) + } else { + ls.req.EncodingType = nil + } +} + // listFn is called from list to handle an object. type listFn func(remote string, object *s3.Object, isDirectory bool) error @@ -2760,7 +2854,6 @@ type listFn func(remote string, object *s3.Object, isDirectory bool) error // // Set recurse to read sub directories func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error { - v1 := f.opt.ListVersion == 1 if prefix != "" { prefix += "/" } @@ -2771,7 +2864,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck if !recurse { delimiter = "/" } - var continuationToken, startAfter *string // URL encode the listings so we can use control characters in object names // See: https://github.com/aws/aws-sdk-go/issues/1914 // @@ -2788,51 +2880,34 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck // So we enable only on providers we know supports it properly, all others can retry when a // XML Syntax error is detected. urlEncodeListings := f.opt.ListURLEncode.Value + req := s3.ListObjectsV2Input{ + Bucket: &bucket, + Delimiter: &delimiter, + Prefix: &directory, + MaxKeys: &f.opt.ListChunk, + } + if f.opt.RequesterPays { + req.RequestPayer = aws.String(s3.RequestPayerRequester) + } + var listBucket bucketLister + switch { + case f.opt.ListVersion == 1: + listBucket = f.newV1List(&req) + default: + listBucket = f.newV2List(&req) + } for { - // FIXME need to implement ALL loop - req := s3.ListObjectsV2Input{ - Bucket: &bucket, - ContinuationToken: continuationToken, - Delimiter: &delimiter, - Prefix: &directory, - MaxKeys: &f.opt.ListChunk, - StartAfter: startAfter, - } - if urlEncodeListings { - req.EncodingType = aws.String(s3.EncodingTypeUrl) - } - if f.opt.RequesterPays { - req.RequestPayer = aws.String(s3.RequestPayerRequester) - } var resp *s3.ListObjectsV2Output var err error err = f.pacer.Call(func() (bool, error) { - if v1 { - // Convert v2 req into v1 req - var reqv1 s3.ListObjectsInput - structs.SetFrom(&reqv1, &req) - reqv1.Marker = continuationToken - if startAfter != nil { - reqv1.Marker = startAfter - } - var respv1 *s3.ListObjectsOutput - respv1, err = f.c.ListObjectsWithContext(ctx, &reqv1) - if err == nil && respv1 != nil { - // convert v1 resp into v2 resp - resp = new(s3.ListObjectsV2Output) - structs.SetFrom(resp, respv1) - resp.NextContinuationToken = respv1.NextMarker - } - } else { - resp, err = f.c.ListObjectsV2WithContext(ctx, &req) - } + listBucket.URLEncodeListings(urlEncodeListings) + resp, _, err = listBucket.List(ctx) if err != nil && !urlEncodeListings { if awsErr, ok := err.(awserr.RequestFailure); ok { if origErr := awsErr.OrigErr(); origErr != nil { if _, ok := origErr.(*xml.SyntaxError); ok { // Retry the listing with URL encoding as there were characters that XML can't encode urlEncodeListings = true - req.EncodingType = aws.String(s3.EncodingTypeUrl) fs.Debugf(f, "Retrying listing because of characters which can't be XML encoded") return true, err } @@ -2921,23 +2996,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck if !aws.BoolValue(resp.IsTruncated) { break } - // Use NextContinuationToken if set, otherwise use last Key for StartAfter - if resp.NextContinuationToken == nil || *resp.NextContinuationToken == "" { - if len(resp.Contents) == 0 { - return errors.New("s3 protocol error: received listing with IsTruncated set, no NextContinuationToken/NextMarker and no Contents") - } - continuationToken = nil - startAfter = resp.Contents[len(resp.Contents)-1].Key - } else { - continuationToken = resp.NextContinuationToken - startAfter = nil - } - if startAfter != nil && urlEncodeListings { - *startAfter, err = url.QueryUnescape(*startAfter) - if err != nil { - return fmt.Errorf("failed to URL decode StartAfter/NextMarker %q: %w", *continuationToken, err) - } - } } return nil }