s3: factor different listing versions into separate objects
This commit is contained in:
parent
7243918069
commit
a59fa2977d
1 changed files with 112 additions and 54 deletions
166
backend/s3/s3.go
166
backend/s3/s3.go
|
@ -2750,6 +2750,100 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Common interface for bucket listers
|
||||
type bucketLister interface {
|
||||
List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error)
|
||||
URLEncodeListings(bool)
|
||||
}
|
||||
|
||||
// V1 bucket lister
|
||||
type v1List struct {
|
||||
f *Fs
|
||||
req s3.ListObjectsInput
|
||||
}
|
||||
|
||||
// Create a new V1 bucket lister
|
||||
func (f *Fs) newV1List(req *s3.ListObjectsV2Input) bucketLister {
|
||||
l := &v1List{
|
||||
f: f,
|
||||
}
|
||||
// Convert v2 req into v1 req
|
||||
structs.SetFrom(&l.req, req)
|
||||
return l
|
||||
}
|
||||
|
||||
// List a bucket with V1 listing
|
||||
func (ls *v1List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
respv1, err := ls.f.c.ListObjectsWithContext(ctx, &ls.req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Set up the request for next time
|
||||
ls.req.Marker = respv1.NextMarker
|
||||
if aws.BoolValue(respv1.IsTruncated) && ls.req.Marker == nil {
|
||||
if len(respv1.Contents) == 0 {
|
||||
return nil, nil, errors.New("s3 protocol error: received listing v1 with IsTruncated set, no NextMarker and no Contents")
|
||||
}
|
||||
// Use the last Key received if no NextMarker and isTruncated
|
||||
ls.req.Marker = respv1.Contents[len(respv1.Contents)-1].Key
|
||||
|
||||
}
|
||||
|
||||
// If we are URL encoding then must decode the marker
|
||||
if ls.req.Marker != nil && ls.req.EncodingType != nil {
|
||||
*ls.req.Marker, err = url.QueryUnescape(*ls.req.Marker)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to URL decode Marker %q: %w", *ls.req.Marker, err)
|
||||
}
|
||||
}
|
||||
|
||||
// convert v1 resp into v2 resp
|
||||
resp = new(s3.ListObjectsV2Output)
|
||||
structs.SetFrom(resp, respv1)
|
||||
|
||||
return resp, nil, nil
|
||||
}
|
||||
|
||||
// URL Encode the listings
|
||||
func (ls *v1List) URLEncodeListings(encode bool) {
|
||||
if encode {
|
||||
ls.req.EncodingType = aws.String(s3.EncodingTypeUrl)
|
||||
} else {
|
||||
ls.req.EncodingType = nil
|
||||
}
|
||||
}
|
||||
|
||||
// V2 bucket lister
|
||||
type v2List struct {
|
||||
f *Fs
|
||||
req s3.ListObjectsV2Input
|
||||
}
|
||||
|
||||
// Create a new V2 bucket lister
|
||||
func (f *Fs) newV2List(req *s3.ListObjectsV2Input) bucketLister {
|
||||
return &v2List{
|
||||
f: f,
|
||||
req: *req,
|
||||
}
|
||||
}
|
||||
|
||||
// Do a V2 listing
|
||||
func (ls *v2List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) {
|
||||
resp, err = ls.f.c.ListObjectsV2WithContext(ctx, &ls.req)
|
||||
ls.req.ContinuationToken = resp.NextContinuationToken
|
||||
return resp, nil, err
|
||||
}
|
||||
|
||||
// URL Encode the listings
|
||||
func (ls *v2List) URLEncodeListings(encode bool) {
|
||||
if encode {
|
||||
ls.req.EncodingType = aws.String(s3.EncodingTypeUrl)
|
||||
} else {
|
||||
ls.req.EncodingType = nil
|
||||
}
|
||||
}
|
||||
|
||||
// listFn is called from list to handle an object.
|
||||
type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
||||
|
||||
|
@ -2760,7 +2854,6 @@ type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
|||
//
|
||||
// Set recurse to read sub directories
|
||||
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error {
|
||||
v1 := f.opt.ListVersion == 1
|
||||
if prefix != "" {
|
||||
prefix += "/"
|
||||
}
|
||||
|
@ -2771,7 +2864,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
if !recurse {
|
||||
delimiter = "/"
|
||||
}
|
||||
var continuationToken, startAfter *string
|
||||
// URL encode the listings so we can use control characters in object names
|
||||
// See: https://github.com/aws/aws-sdk-go/issues/1914
|
||||
//
|
||||
|
@ -2788,51 +2880,34 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
// So we enable only on providers we know supports it properly, all others can retry when a
|
||||
// XML Syntax error is detected.
|
||||
urlEncodeListings := f.opt.ListURLEncode.Value
|
||||
req := s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
Delimiter: &delimiter,
|
||||
Prefix: &directory,
|
||||
MaxKeys: &f.opt.ListChunk,
|
||||
}
|
||||
if f.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
}
|
||||
var listBucket bucketLister
|
||||
switch {
|
||||
case f.opt.ListVersion == 1:
|
||||
listBucket = f.newV1List(&req)
|
||||
default:
|
||||
listBucket = f.newV2List(&req)
|
||||
}
|
||||
for {
|
||||
// FIXME need to implement ALL loop
|
||||
req := s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
ContinuationToken: continuationToken,
|
||||
Delimiter: &delimiter,
|
||||
Prefix: &directory,
|
||||
MaxKeys: &f.opt.ListChunk,
|
||||
StartAfter: startAfter,
|
||||
}
|
||||
if urlEncodeListings {
|
||||
req.EncodingType = aws.String(s3.EncodingTypeUrl)
|
||||
}
|
||||
if f.opt.RequesterPays {
|
||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||
}
|
||||
var resp *s3.ListObjectsV2Output
|
||||
var err error
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
if v1 {
|
||||
// Convert v2 req into v1 req
|
||||
var reqv1 s3.ListObjectsInput
|
||||
structs.SetFrom(&reqv1, &req)
|
||||
reqv1.Marker = continuationToken
|
||||
if startAfter != nil {
|
||||
reqv1.Marker = startAfter
|
||||
}
|
||||
var respv1 *s3.ListObjectsOutput
|
||||
respv1, err = f.c.ListObjectsWithContext(ctx, &reqv1)
|
||||
if err == nil && respv1 != nil {
|
||||
// convert v1 resp into v2 resp
|
||||
resp = new(s3.ListObjectsV2Output)
|
||||
structs.SetFrom(resp, respv1)
|
||||
resp.NextContinuationToken = respv1.NextMarker
|
||||
}
|
||||
} else {
|
||||
resp, err = f.c.ListObjectsV2WithContext(ctx, &req)
|
||||
}
|
||||
listBucket.URLEncodeListings(urlEncodeListings)
|
||||
resp, _, err = listBucket.List(ctx)
|
||||
if err != nil && !urlEncodeListings {
|
||||
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
||||
if origErr := awsErr.OrigErr(); origErr != nil {
|
||||
if _, ok := origErr.(*xml.SyntaxError); ok {
|
||||
// Retry the listing with URL encoding as there were characters that XML can't encode
|
||||
urlEncodeListings = true
|
||||
req.EncodingType = aws.String(s3.EncodingTypeUrl)
|
||||
fs.Debugf(f, "Retrying listing because of characters which can't be XML encoded")
|
||||
return true, err
|
||||
}
|
||||
|
@ -2921,23 +2996,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||
if !aws.BoolValue(resp.IsTruncated) {
|
||||
break
|
||||
}
|
||||
// Use NextContinuationToken if set, otherwise use last Key for StartAfter
|
||||
if resp.NextContinuationToken == nil || *resp.NextContinuationToken == "" {
|
||||
if len(resp.Contents) == 0 {
|
||||
return errors.New("s3 protocol error: received listing with IsTruncated set, no NextContinuationToken/NextMarker and no Contents")
|
||||
}
|
||||
continuationToken = nil
|
||||
startAfter = resp.Contents[len(resp.Contents)-1].Key
|
||||
} else {
|
||||
continuationToken = resp.NextContinuationToken
|
||||
startAfter = nil
|
||||
}
|
||||
if startAfter != nil && urlEncodeListings {
|
||||
*startAfter, err = url.QueryUnescape(*startAfter)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to URL decode StartAfter/NextMarker %q: %w", *continuationToken, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue