s3: empty directory markers further work #3453
- Report correct feature flag - Fix test failures due to that - don't output the root directory marker - Don't create the directory marker if it is the bucket or root - Create directories when uploading files
This commit is contained in:
parent
b6a95c70e9
commit
74652bf318
2 changed files with 128 additions and 43 deletions
168
backend/s3/s3.go
168
backend/s3/s3.go
|
@ -2200,10 +2200,10 @@ cheaper egress for data downloaded through the CloudFront network.`,
|
||||||
Name: "directory_markers",
|
Name: "directory_markers",
|
||||||
Default: false,
|
Default: false,
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
Help: `Upload an empty object with a trailing slash in name when new directory is created
|
Help: `Upload an empty object with a trailing slash when a new directory is created
|
||||||
|
|
||||||
Empty folders are unsupported for bucket based remotes, this option creates an empty
|
Empty folders are unsupported for bucket based remotes, this option creates an empty
|
||||||
object named "/", to persist folder.
|
object ending with "/", to persist the folder.
|
||||||
`,
|
`,
|
||||||
}, {
|
}, {
|
||||||
Name: "use_multipart_etag",
|
Name: "use_multipart_etag",
|
||||||
|
@ -3092,6 +3092,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||||
if opt.Provider == "IDrive" {
|
if opt.Provider == "IDrive" {
|
||||||
f.features.SetTier = false
|
f.features.SetTier = false
|
||||||
}
|
}
|
||||||
|
if opt.DirectoryMarkers {
|
||||||
|
f.features.CanHaveEmptyDirectories = true
|
||||||
|
}
|
||||||
// f.listMultipartUploads()
|
// f.listMultipartUploads()
|
||||||
|
|
||||||
if f.rootBucket != "" && f.rootDirectory != "" && !opt.NoHeadObject && !strings.HasSuffix(root, "/") {
|
if f.rootBucket != "" && f.rootDirectory != "" && !opt.NoHeadObject && !strings.HasSuffix(root, "/") {
|
||||||
|
@ -3584,6 +3587,7 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
|
||||||
default:
|
default:
|
||||||
listBucket = f.newV2List(&req)
|
listBucket = f.newV2List(&req)
|
||||||
}
|
}
|
||||||
|
foundItems := 0
|
||||||
for {
|
for {
|
||||||
var resp *s3.ListObjectsV2Output
|
var resp *s3.ListObjectsV2Output
|
||||||
var err error
|
var err error
|
||||||
|
@ -3625,6 +3629,7 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !opt.recurse {
|
if !opt.recurse {
|
||||||
|
foundItems += len(resp.CommonPrefixes)
|
||||||
for _, commonPrefix := range resp.CommonPrefixes {
|
for _, commonPrefix := range resp.CommonPrefixes {
|
||||||
if commonPrefix.Prefix == nil {
|
if commonPrefix.Prefix == nil {
|
||||||
fs.Logf(f, "Nil common prefix received")
|
fs.Logf(f, "Nil common prefix received")
|
||||||
|
@ -3657,6 +3662,7 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
foundItems += len(resp.Contents)
|
||||||
for i, object := range resp.Contents {
|
for i, object := range resp.Contents {
|
||||||
remote := aws.StringValue(object.Key)
|
remote := aws.StringValue(object.Key)
|
||||||
if urlEncodeListings {
|
if urlEncodeListings {
|
||||||
|
@ -3672,18 +3678,28 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote = remote[len(opt.prefix):]
|
remote = remote[len(opt.prefix):]
|
||||||
isDirectory := remote == "" || strings.HasSuffix(remote, "/")
|
isDirectory := (remote == "" || strings.HasSuffix(remote, "/")) && object.Size != nil && *object.Size == 0
|
||||||
if opt.addBucket {
|
if opt.addBucket {
|
||||||
remote = bucket.Join(opt.bucket, remote)
|
remote = bucket.Join(opt.bucket, remote)
|
||||||
}
|
}
|
||||||
// is this a directory marker?
|
// is this a directory marker?
|
||||||
if isDirectory && object.Size != nil && *object.Size == 0 && !opt.noSkipMarkers {
|
if isDirectory {
|
||||||
continue // skip directory marker
|
if opt.noSkipMarkers {
|
||||||
|
// process directory markers as files
|
||||||
|
isDirectory = false
|
||||||
|
} else {
|
||||||
|
// Don't insert the root directory
|
||||||
|
if remote == opt.directory {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// process directory markers as directories
|
||||||
|
remote = strings.TrimRight(remote, "/")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if versionIDs != nil {
|
if versionIDs != nil {
|
||||||
err = fn(remote, object, versionIDs[i], false)
|
err = fn(remote, object, versionIDs[i], isDirectory)
|
||||||
} else {
|
} else {
|
||||||
err = fn(remote, object, nil, false)
|
err = fn(remote, object, nil, isDirectory)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errEndList {
|
if err == errEndList {
|
||||||
|
@ -3696,6 +3712,20 @@ func (f *Fs) list(ctx context.Context, opt listOpt, fn listFn) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if f.opt.DirectoryMarkers && foundItems == 0 && opt.directory != "" {
|
||||||
|
// Determine whether the directory exists or not by whether it has a marker
|
||||||
|
req := s3.HeadObjectInput{
|
||||||
|
Bucket: &opt.bucket,
|
||||||
|
Key: &opt.directory,
|
||||||
|
}
|
||||||
|
_, err := f.headObject(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
if err == fs.ErrorObjectNotFound {
|
||||||
|
return fs.ErrorDirNotFound
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3886,6 +3916,52 @@ func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create directory marker file and parents
|
||||||
|
func (f *Fs) createDirectoryMarker(ctx context.Context, bucket, dir string) error {
|
||||||
|
if !f.opt.DirectoryMarkers || bucket == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Object to be uploaded
|
||||||
|
o := &Object{
|
||||||
|
fs: f,
|
||||||
|
meta: map[string]string{
|
||||||
|
metaMtime: swift.TimeToFloatString(time.Now()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, bucketPath := f.split(dir)
|
||||||
|
// Don't create the directory marker if it is the bucket or at the very root
|
||||||
|
if bucketPath == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
o.remote = dir + "/"
|
||||||
|
|
||||||
|
// Check to see if object already exists
|
||||||
|
_, err := o.headObject(ctx)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload it if not
|
||||||
|
fs.Debugf(o, "Creating directory marker")
|
||||||
|
content := io.Reader(strings.NewReader(""))
|
||||||
|
_, err = f.Put(ctx, content, o)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating directory marker failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now check parent directory exists
|
||||||
|
dir = path.Dir(dir)
|
||||||
|
if dir == "/" || dir == "." {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Mkdir creates the bucket if it doesn't exist
|
// Mkdir creates the bucket if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
bucket, _ := f.split(dir)
|
bucket, _ := f.split(dir)
|
||||||
|
@ -3893,23 +3969,17 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
// Create directory marker file
|
return f.createDirectoryMarker(ctx, bucket, dir)
|
||||||
if f.opt.DirectoryMarkers && bucket != "" && dir != "" {
|
}
|
||||||
markerFilePath := fmt.Sprintf("%s/", dir)
|
|
||||||
markerFileContent := io.Reader(strings.NewReader(""))
|
// mkdirParent creates the parent bucket/directory if it doesn't exist
|
||||||
markerFileObject := &Object{
|
func (f *Fs) mkdirParent(ctx context.Context, remote string) error {
|
||||||
fs: f,
|
remote = strings.TrimRight(remote, "/")
|
||||||
remote: markerFilePath,
|
dir := path.Dir(remote)
|
||||||
meta: map[string]string{
|
if dir == "/" || dir == "." {
|
||||||
metaMtime: swift.TimeToFloatString(time.Now()),
|
dir = ""
|
||||||
},
|
|
||||||
}
|
|
||||||
_, e := f.Put(ctx, markerFileContent, markerFileObject)
|
|
||||||
if e != nil {
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return f.Mkdir(ctx, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeBucket creates the bucket if it doesn't exist
|
// makeBucket creates the bucket if it doesn't exist
|
||||||
|
@ -3952,12 +4022,15 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
bucket, directory := f.split(dir)
|
bucket, directory := f.split(dir)
|
||||||
// Remove directory marker file
|
// Remove directory marker file
|
||||||
if f.opt.DirectoryMarkers && bucket != "" && dir != "" {
|
if f.opt.DirectoryMarkers && bucket != "" && dir != "" {
|
||||||
markerFilePath := fmt.Sprintf("%s/", dir)
|
o := &Object{
|
||||||
markerFileObject := &Object{
|
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: markerFilePath,
|
remote: dir + "/",
|
||||||
|
}
|
||||||
|
fs.Debugf(o, "Removing directory marker")
|
||||||
|
err := o.Remove(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("removing directory marker failed: %w", err)
|
||||||
}
|
}
|
||||||
_ = markerFileObject.Remove(ctx)
|
|
||||||
}
|
}
|
||||||
if bucket == "" || directory != "" {
|
if bucket == "" || directory != "" {
|
||||||
return nil
|
return nil
|
||||||
|
@ -4157,7 +4230,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
return nil, errNotWithVersionAt
|
return nil, errNotWithVersionAt
|
||||||
}
|
}
|
||||||
dstBucket, dstPath := f.split(remote)
|
dstBucket, dstPath := f.split(remote)
|
||||||
err := f.makeBucket(ctx, dstBucket)
|
err := f.mkdirParent(ctx, remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -4784,22 +4857,26 @@ func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err
|
||||||
Key: &bucketPath,
|
Key: &bucketPath,
|
||||||
VersionId: o.versionID,
|
VersionId: o.versionID,
|
||||||
}
|
}
|
||||||
if o.fs.opt.RequesterPays {
|
return o.fs.headObject(ctx, &req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fs) headObject(ctx context.Context, req *s3.HeadObjectInput) (resp *s3.HeadObjectOutput, err error) {
|
||||||
|
if f.opt.RequesterPays {
|
||||||
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
req.RequestPayer = aws.String(s3.RequestPayerRequester)
|
||||||
}
|
}
|
||||||
if o.fs.opt.SSECustomerAlgorithm != "" {
|
if f.opt.SSECustomerAlgorithm != "" {
|
||||||
req.SSECustomerAlgorithm = &o.fs.opt.SSECustomerAlgorithm
|
req.SSECustomerAlgorithm = &f.opt.SSECustomerAlgorithm
|
||||||
}
|
}
|
||||||
if o.fs.opt.SSECustomerKey != "" {
|
if f.opt.SSECustomerKey != "" {
|
||||||
req.SSECustomerKey = &o.fs.opt.SSECustomerKey
|
req.SSECustomerKey = &f.opt.SSECustomerKey
|
||||||
}
|
}
|
||||||
if o.fs.opt.SSECustomerKeyMD5 != "" {
|
if f.opt.SSECustomerKeyMD5 != "" {
|
||||||
req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5
|
req.SSECustomerKeyMD5 = &f.opt.SSECustomerKeyMD5
|
||||||
}
|
}
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
var err error
|
var err error
|
||||||
resp, err = o.fs.c.HeadObjectWithContext(ctx, &req)
|
resp, err = f.c.HeadObjectWithContext(ctx, req)
|
||||||
return o.fs.shouldRetry(ctx, err)
|
return f.shouldRetry(ctx, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
||||||
|
@ -4809,7 +4886,9 @@ func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
o.fs.cache.MarkOK(bucket)
|
if req.Bucket != nil {
|
||||||
|
f.cache.MarkOK(*req.Bucket)
|
||||||
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5458,9 +5537,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
return errNotWithVersionAt
|
return errNotWithVersionAt
|
||||||
}
|
}
|
||||||
bucket, bucketPath := o.split()
|
bucket, bucketPath := o.split()
|
||||||
err := o.fs.makeBucket(ctx, bucket)
|
// Create parent dir/bucket if not saving directory marker
|
||||||
if err != nil {
|
if !strings.HasSuffix(o.remote, "/") {
|
||||||
return err
|
err := o.fs.mkdirParent(ctx, o.remote)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
modTime := src.ModTime(ctx)
|
modTime := src.ModTime(ctx)
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
|
|
|
@ -167,6 +167,9 @@ backends:
|
||||||
- backend: "s3"
|
- backend: "s3"
|
||||||
remote: "TestS3:"
|
remote: "TestS3:"
|
||||||
fastlist: true
|
fastlist: true
|
||||||
|
- backend: "s3"
|
||||||
|
remote: "TestS3,directory_markers:"
|
||||||
|
fastlist: true
|
||||||
- backend: "s3"
|
- backend: "s3"
|
||||||
remote: "TestS3Minio:"
|
remote: "TestS3Minio:"
|
||||||
fastlist: true
|
fastlist: true
|
||||||
|
|
Loading…
Reference in a new issue