S3 driver: Attempt HeadObject on Stat first, fail over to List

Stat always calls ListObjects when stat-ing S3 key.
Unfortauntely ListObjects is not a free call - both in terms of egress
and actual AWS costs (likely because of the egress).

This changes the behaviour of Stat such that we always attempt the
HeadObject call first and only ever fall through to ListObjects if the
HeadObject returns an AWS API error.

Note, that the official docs mention that the only error returned by
HEAD is NoSuchKey; experiments show that this is demonstrably wrong and
the AWS docs are simply outdated at the time of this commit.

HeadObject actually returns the following errors:
* NotFound: if the queried key does not exist
* NotFound: if the queried key contains subkeys i.e. it's a prefix
* BucketRegionError: if the bucket does not exist
* Forbidden: if Head operation is not allows via IAM/ACLs

Co-authored-by: Cory Snider <corhere@gmail.com>
Co-authored-by: Sebastiaan van Stijn <github@gone.nl>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
Milos Gajdos 2024-07-13 18:38:17 +01:00
parent 54cf4165d4
commit a18cc8a656
No known key found for this signature in database

View file

@ -763,37 +763,76 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
// Stat retrieves the FileInfo for the given path, including the current size func (d *driver) statHead(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
// in bytes and the creation time. resp, err := d.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
if err != nil {
return nil, err
}
return &storagedriver.FileInfoFields{
Path: path,
IsDir: false,
Size: *resp.ContentLength,
ModTime: *resp.LastModified,
}, nil
}
func (d *driver) statList(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
s3Path := d.s3Path(path)
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)), Prefix: aws.String(s3Path),
MaxKeys: aws.Int64(1), MaxKeys: aws.Int64(1),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
fi := storagedriver.FileInfoFields{
Path: path,
}
if len(resp.Contents) == 1 { if len(resp.Contents) == 1 {
if *resp.Contents[0].Key != d.s3Path(path) { if *resp.Contents[0].Key != s3Path {
fi.IsDir = true return &storagedriver.FileInfoFields{
} else { Path: path,
fi.IsDir = false IsDir: true,
fi.Size = *resp.Contents[0].Size }, nil
fi.ModTime = *resp.Contents[0].LastModified }
return &storagedriver.FileInfoFields{
Path: path,
Size: *resp.Contents[0].Size,
ModTime: *resp.Contents[0].LastModified,
}, nil
}
if len(resp.CommonPrefixes) == 1 {
return &storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}, nil
} }
} else if len(resp.CommonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil // Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
fi, err := d.statHead(ctx, path)
if err != nil {
// For AWS errors, we fail over to ListObjects:
// Though the official docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Errors
// are slightly outdated, the HeadObject actually returns NotFound error
// if querying a key which doesn't exist or a key which has nested keys
// and Forbidden if IAM/ACL permissions do not allow Head but allow List.
var awsErr awserr.Error
if errors.As(err, &awsErr) {
fi, err := d.statList(ctx, path)
if err != nil {
return nil, parseError(path, err)
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
}
// For non-AWS errors, return the error directly
return nil, err
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
} }
// List returns a list of the objects that are direct descendants of the given path. // List returns a list of the objects that are direct descendants of the given path.