azurefiles: finish docs and implementation and add optional interfaces

- use rclone's http Transport
- fix handling of 0 length files
- combine into one file and remove uneeded abstraction
- make `chunk_size` and `upload_concurrency` settable
- make auth the same as azureblob
- set the Features correctly
- implement `--azurefiles-max-stream-size`
- remove arbitrary sleep on Mkdir
- implement `--header-upload`
- implement read and write MimeType for objects
- implement optional methods
    - About
    - Copy
    - DirMove
    - Move
    - OpenWriterAt
    - PutStream
- finish documentation
- disable build on plan9 and js

Fixes #365
Fixes #7378
This commit is contained in:
Nick Craig-Wood 2023-11-15 09:37:57 +00:00
parent b5301e03a6
commit ddaf01ece9
15 changed files with 1994 additions and 723 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,6 @@
//go:build !plan9 && !js
// +build !plan9,!js
package azurefiles
import (
@ -41,7 +44,7 @@ func (f *Fs) InternalTestAuth(t *testing.T) {
name: "SASUrl",
options: &Options{
ShareName: shareName,
SASUrl: "",
SASURL: "",
}},
}

View file

@ -1,3 +1,6 @@
//go:build !plan9 && !js
// +build !plan9,!js
package azurefiles
import (

View file

@ -0,0 +1,7 @@
// Build for azurefiles for unsupported platforms to stop go complaining
// about "no buildable Go source files "
//go:build plan9 || js
// +build plan9 js
package azurefiles

View file

@ -1,44 +0,0 @@
package azurefiles
import (
"context"
"time"
)
// Directory is a filesystem like directory provided by an Fs
type Directory struct {
common
}
// Items returns the count of items in this directory or this
// directory and subdirectories if known, -1 for unknown
//
// It is unknown since getting the count of items results in a
// network request
func (d *Directory) Items() int64 {
return -1
}
// ID returns empty string. Can be implemented as part of IDer
func (d *Directory) ID() string {
return ""
}
// Size is returns the size of the file.
// This method is implemented because it is part of the [fs.DirEntry] interface
func (d *Directory) Size() int64 {
return 0
}
// ModTime returns the modification time of the object
//
// TODO: check whether FileLastWriteTime is what the clients of this API want. Maybe
// FileLastWriteTime does not get changed when directory contents are updated but consumers
// of this API expect d.ModTime to do so
func (d *Directory) ModTime(ctx context.Context) time.Time {
props, err := d.f.dirClient(d.remote).GetProperties(ctx, nil)
if err != nil {
return time.Now()
}
return *props.FileLastWriteTime
}

View file

@ -1,292 +0,0 @@
package azurefiles
import (
"context"
"errors"
"fmt"
"io"
"log"
"path"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
)
const sleepDurationBetweenRecursiveMkdirPutCalls = time.Millisecond * 500
const fourTbInBytes = 4398046511104
// NewObject finds the Object at remote. If it can't be found
// it returns the error fs.ErrorObjectNotFound.
//
// Does not return ErrorIsDir when a directory exists instead of file. since the documentation
// for [rclone.fs.Fs.NewObject] rqeuires no extra work to determine whether it is directory
//
// Inspired by azureblob store, this initiates a network request and returns an error if object is not found.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
fileClient := f.fileClient(remote)
resp, err := fileClient.GetProperties(ctx, nil)
if fileerror.HasCode(err, fileerror.ParentNotFound, fileerror.ResourceNotFound) {
return nil, fs.ErrorObjectNotFound
} else if err != nil {
return nil, fmt.Errorf("unable to find object remote=%s : %w", remote, err)
}
ob := objectInstance(f, remote, *resp.ContentLength, resp.ContentMD5, *resp.FileLastWriteTime)
return &ob, nil
}
// Mkdir creates nested directories as indicated by test FsMkdirRmdirSubdir
// TODO: write custom test case where parent directories are created
// Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, remote string) error {
return f.mkdirRelativeToRootOfShare(ctx, f.decodedFullPath(remote))
}
// rclone completes commands such as rclone copy localdir remote:parentcontainer/childcontainer
// where localdir is a tree of files and directories. The above command is expected to complete even
// when parentcontainer and childcontainer directors do not exist on the remote. The following
// code with emphasis on fullPathRelativeToShareRoot is written to handle such cases by recursiely creating
// parent directories all the way to the root of the share
//
// When path argument is an empty string, windows and linux return and error. However, this
// implementation does not return an error
func (f *Fs) mkdirRelativeToRootOfShare(ctx context.Context, fullPathRelativeToShareRoot string) error {
fp := fullPathRelativeToShareRoot
if fp == "" {
return nil
}
dirClient := f.newSubdirectoryClientFromEncodedPathRelativeToShareRoot(f.encodePath(fp))
// now := time.Now()
// smbProps := &file.SMBProperties{
// LastWriteTime: &now,
// }
// dirCreateOptions := &directory.CreateOptions{
// FileSMBProperties: smbProps,
// }
_, createDirErr := dirClient.Create(ctx, nil)
if fileerror.HasCode(createDirErr, fileerror.ParentNotFound) {
parentDir := path.Dir(fp)
if parentDir == fp {
log.Fatal("This will lead to infinite recursion since parent and remote are equal")
}
makeParentErr := f.mkdirRelativeToRootOfShare(ctx, parentDir)
if makeParentErr != nil {
return fmt.Errorf("could not make parent of %s : %w", fp, makeParentErr)
}
log.Printf("Mkdir: waiting for %s after making parent=%s", sleepDurationBetweenRecursiveMkdirPutCalls.String(), parentDir)
time.Sleep(sleepDurationBetweenRecursiveMkdirPutCalls)
return f.mkdirRelativeToRootOfShare(ctx, fp)
} else if fileerror.HasCode(createDirErr, fileerror.ResourceAlreadyExists) {
return nil
} else if createDirErr != nil {
return fmt.Errorf("unable to MkDir: %w", createDirErr)
}
return nil
}
// Rmdir deletes the root folder
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir(ctx context.Context, remote string) error {
dirClient := f.dirClient(remote)
_, err := dirClient.Delete(ctx, nil)
if err != nil {
if fileerror.HasCode(err, fileerror.DirectoryNotEmpty) {
return fs.ErrorDirectoryNotEmpty
} else if fileerror.HasCode(err, fileerror.ResourceNotFound) {
return fs.ErrorDirNotFound
}
return fmt.Errorf("could not rmdir dir=\"%s\" : %w", remote, err)
}
return nil
}
// Put the object
//
// Copies the reader in to the new object. This new object is returned.
//
// The new object may have been created if an error is returned
// TODO: when file.CLient.Creat is being used, provide HTTP headesr such as content type and content MD5
// TODO: maybe replace PUT with NewObject + Update
// TODO: in case file is created but there is a problem on upload, what happens
// TODO: what happens when file already exists at the location
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if src.Size() > fourTbInBytes {
return nil, fmt.Errorf("max supported file size is 4TB. provided size is %d", src.Size())
} else if src.Size() < 0 {
// TODO: what should happened when src.Size == 0
return nil, fmt.Errorf("src.Size is a required to be a whole number : %d", src.Size())
}
fc := f.fileClient(src.Remote())
_, createErr := fc.Create(ctx, src.Size(), nil)
if fileerror.HasCode(createErr, fileerror.ParentNotFound) {
parentDir := path.Dir(src.Remote())
if mkDirErr := f.Mkdir(ctx, parentDir); mkDirErr != nil {
return nil, fmt.Errorf("unable to make parent directories : %w", mkDirErr)
}
log.Printf("Mkdir: waiting for %s after making parent=%s", sleepDurationBetweenRecursiveMkdirPutCalls.String(), parentDir)
time.Sleep(sleepDurationBetweenRecursiveMkdirPutCalls)
return f.Put(ctx, in, src, options...)
} else if createErr != nil {
return nil, fmt.Errorf("unable to create file : %w", createErr)
}
obj := &Object{
common: common{
f: f,
remote: src.Remote(),
},
}
if updateErr := obj.upload(ctx, in, src, true, options...); updateErr != nil {
err := fmt.Errorf("while executing update after creating file as part of fs.Put : %w", updateErr)
if _, delErr := fc.Delete(ctx, nil); delErr != nil {
return nil, errors.Join(delErr, updateErr)
}
return obj, err
}
return obj, nil
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
return fmt.Sprintf("azurefiles root '%s'", f.root)
}
// Precision return the precision of this Fs
//
// One second. FileREST API times are in RFC1123 which in the example shows a precision of seconds
// Source: https://learn.microsoft.com/en-us/rest/api/storageservices/representation-of-date-time-values-in-headers
func (f *Fs) Precision() time.Duration {
return time.Second
}
// Hashes returns the supported hash sets.
//
// MD5: since it is listed as header in the response for file properties
// Source: https://learn.microsoft.com/en-us/rest/api/storageservices/get-file-properties
func (f *Fs) Hashes() hash.Set {
return hash.NewHashSet(hash.MD5)
}
// Features returns the optional features of this Fs
//
// TODO: add features:- public link, SlowModTime, SlowHash,
// ReadMetadata, WriteMetadata,UserMetadata,PutUnchecked, PutStream
// PartialUploads: Maybe????
// FileID and DirectoryID can be implemented. They are atleast returned as part of listing response
func (f *Fs) Features() *fs.Features {
return &fs.Features{
CanHaveEmptyDirectories: true,
// Copy: func(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
// return f.CopyFile(ctx, src, remote)
// },
}
}
// List the objects and directories in dir into entries. The entries can be
// returned in any order but should be for a complete directory.
//
// dir should be "" to list the root, and should not have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't found.
//
// TODO: handle case regariding "" and "/". I remember reading about them somewhere
func (f *Fs) List(ctx context.Context, remote string) (fs.DirEntries, error) {
var entries fs.DirEntries
subDirClient := f.dirClient(remote)
// Checking whether directory exists
_, err := subDirClient.GetProperties(ctx, nil)
if fileerror.HasCode(err, fileerror.ParentNotFound, fileerror.ResourceNotFound) {
return entries, fs.ErrorDirNotFound
} else if err != nil {
return entries, err
}
pager := subDirClient.NewListFilesAndDirectoriesPager(listFilesAndDirectoriesOptions)
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return entries, err
}
for _, dir := range resp.Segment.Directories {
de := &Directory{
common{f: f,
remote: path.Join(remote, f.decodePath(*dir.Name)),
properties: properties{
lastWriteTime: *dir.Properties.LastWriteTime,
}},
}
entries = append(entries, de)
}
for _, file := range resp.Segment.Files {
de := &Object{
common{f: f,
remote: path.Join(remote, f.decodePath(*file.Name)),
properties: properties{
contentLength: *file.Properties.ContentLength,
lastWriteTime: *file.Properties.LastWriteTime,
}},
}
entries = append(entries, de)
}
}
return entries, nil
}
type encodedPath string
func (f *Fs) decodedFullPath(decodedRemote string) string {
return path.Join(f.root, decodedRemote)
}
func (f *Fs) dirClient(decodedRemote string) *directory.Client {
fullPathDecoded := f.decodedFullPath(decodedRemote)
fullPathEncoded := f.encodePath(fullPathDecoded)
return f.newSubdirectoryClientFromEncodedPathRelativeToShareRoot(fullPathEncoded)
}
func (f *Fs) newSubdirectoryClientFromEncodedPathRelativeToShareRoot(p encodedPath) *directory.Client {
return f.shareRootDirClient.NewSubdirectoryClient(string(p))
}
func (f *Fs) fileClient(decodedRemote string) *file.Client {
fullPathDecoded := f.decodedFullPath(decodedRemote)
fullPathEncoded := f.encodePath(fullPathDecoded)
return f.fileClientFromEncodedPathRelativeToShareRoot(fullPathEncoded)
}
func (f *Fs) fileClientFromEncodedPathRelativeToShareRoot(p encodedPath) *file.Client {
return f.shareRootDirClient.NewFileClient(string(p))
}
func (f *Fs) encodePath(p string) encodedPath {
return encodedPath(f.opt.Enc.FromStandardPath(p))
}
func (f *Fs) decodePath(p string) string {
return f.opt.Enc.ToStandardPath(p)
}
// on 20231019 at 1324 work to be continued at trying to fix FAIL: TestIntegration/FsMkdir/FsPutFiles/FromRoot

View file

@ -1,279 +0,0 @@
package azurefiles
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"log/slog"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
)
// TODO: maybe use this in the result of list. or replace all instances where object instances are created
func objectInstance(f *Fs, remote string, contentLength int64, md5Hash []byte, lwt time.Time) Object {
return Object{common: common{
f: f,
remote: remote,
properties: properties{
contentLength: contentLength,
md5Hash: md5Hash,
lastWriteTime: lwt,
},
}}
}
// Size of object in bytes
func (o *Object) Size() int64 {
return o.properties.contentLength
}
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.f
}
// Hash returns the MD5 of an object returning a lowercase hex string
//
// May make a network request becaue the [fs.List] method does not
// return MD5 hashes for DirEntry
func (o *Object) Hash(ctx context.Context, ty hash.Type) (string, error) {
if ty != hash.MD5 {
return "", hash.ErrUnsupported
}
if len(o.common.properties.md5Hash) == 0 {
props, err := o.fileClient().GetProperties(ctx, nil)
if err != nil {
return "", fmt.Errorf("unable to fetch properties to determine hash")
}
o.common.properties.md5Hash = props.ContentMD5
}
return hex.EncodeToString(o.common.properties.md5Hash), nil
}
// Storable returns a boolean showing whether this object storable
func (o *Object) Storable() bool {
return true
}
// Object describes a Azure File Share File not a Directory
type Object struct {
common
}
// These fields have pointer types because it seems to
// TODO: descide whether these could be pointer or not
type properties struct {
contentLength int64
md5Hash []byte
lastWriteTime time.Time
}
func (o *Object) fileClient() *file.Client {
decodedFullPath := o.f.decodedFullPath(o.remote)
fullEncodedPath := o.f.encodePath(decodedFullPath)
return o.f.fileClientFromEncodedPathRelativeToShareRoot(fullEncodedPath)
}
// SetModTime sets the modification time
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
smbProps := file.SMBProperties{
LastWriteTime: &t,
}
setHeadersOptions := file.SetHTTPHeadersOptions{
SMBProperties: &smbProps,
}
_, err := o.fileClient().SetHTTPHeaders(ctx, &setHeadersOptions)
if err != nil {
return fmt.Errorf("unable to set modTime : %w", err)
}
o.lastWriteTime = t
return nil
}
// ModTime returns the modification time of the object
//
// Returns time.Now() if not present
// TODO: convert o.lastWriteTime to *time.Time so that one can know when it has
// been explicitly set
func (o *Object) ModTime(ctx context.Context) time.Time {
if o.lastWriteTime.Unix() <= 1 {
return time.Now()
}
return o.lastWriteTime
}
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
// TODO: should the options for delete not be nil. Depends on behaviour expected by consumers
if _, err := o.fileClient().Delete(ctx, nil); err != nil {
return fmt.Errorf("unable to delete remote=\"%s\" : %w", o.remote, err)
}
return nil
}
// Open an object for read
//
// TODO: check for mandatory options and the other options
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
downloadStreamOptions := file.DownloadStreamOptions{}
for _, opt := range options {
switch v := opt.(type) {
case *fs.SeekOption:
httpRange := file.HTTPRange{
Offset: v.Offset,
}
downloadStreamOptions.Range = httpRange
case *fs.RangeOption:
var start *int64
var end *int64
if v.Start >= 0 {
start = &v.Start
}
if v.End >= 0 {
end = &v.End
}
fhr := file.HTTPRange{}
if start != nil && end != nil {
fhr.Offset = *start
fhr.Count = *end - *start + 1
} else if start != nil && end == nil {
fhr.Offset = *start
} else if start == nil && end != nil {
fhr.Offset = o.contentLength - *end
}
downloadStreamOptions.Range = fhr
}
}
resp, err := o.fileClient().DownloadStream(ctx, &downloadStreamOptions)
if err != nil {
return nil, fmt.Errorf("could not open remote=\"%s\" : %w", o.remote, err)
}
return resp.Body, nil
}
func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo, isDestNewlyCreated bool, options ...fs.OpenOption) error {
if src.Size() > fourTbInBytes {
return fmt.Errorf("max supported file size is 4TB. provided size is %d", src.Size())
} else if src.Size() < 0 {
return fmt.Errorf("files with unknown sizes are not supported")
}
fc := o.fileClient()
if !isDestNewlyCreated {
if src.Size() != o.Size() {
if _, resizeErr := fc.Resize(ctx, src.Size(), nil); resizeErr != nil {
return fmt.Errorf("unable to resize while trying to update. %w ", resizeErr)
}
}
}
var md5Hash []byte
hashToBeComputed := false
if hashStr, err := src.Hash(ctx, hash.MD5); err != nil || hashStr == "" {
hashToBeComputed = true
} else {
var decodeErr error
md5Hash, decodeErr = hex.DecodeString(hashStr)
if decodeErr != nil {
hashToBeComputed = true
msg := fmt.Sprintf("should not happen. Error while decoding hex encoded md5 '%s'. Error is %s",
hashStr, decodeErr.Error())
slog.Error(msg)
}
}
var uploadErr error
if hashToBeComputed {
md5Hash, uploadErr = uploadStreamAndComputeHash(ctx, fc, in, src, options...)
} else {
uploadErr = uploadStream(ctx, fc, in, src, options...)
}
if uploadErr != nil {
return fmt.Errorf("while uploading %s : %w", src.Remote(), uploadErr)
}
modTime := src.ModTime(ctx)
if err := uploadSizeHashLWT(ctx, fc, src.Size(), md5Hash, modTime); err != nil {
return fmt.Errorf("while setting size hash and last write time for %s : %w", src.Remote(), err)
}
o.properties.contentLength = src.Size()
o.properties.md5Hash = md5Hash
o.properties.lastWriteTime = modTime
return nil
}
// Update the object with the contents of the io.Reader, modTime, size and MD5 hash
// Does not create a new object
//
// TODO: implement options. understand purpose of options
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
return o.upload(ctx, in, src, false, options...)
}
// cannot set modTime header here because setHTTPHeaders does not allow setting metadata
func uploadStream(ctx context.Context, fc *file.Client, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
// TODO: set concurrency level
uploadStreamOptions := file.UploadStreamOptions{
ChunkSize: chunkSize(options...),
}
if err := fc.UploadStream(ctx, in, &uploadStreamOptions); err != nil {
return fmt.Errorf("unable to upload. cannot upload stream : %w", err)
}
return nil
}
func uploadStreamAndComputeHash(ctx context.Context, fc *file.Client, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) ([]byte, error) {
hasher := md5.New()
teeReader := io.TeeReader(in, hasher)
err := uploadStream(ctx, fc, teeReader, src, options...)
if err != nil {
return []byte{}, err
}
return hasher.Sum(nil), nil
}
// the function is named with prefix 'upload' since it indicates that things will be modified on the server
func uploadSizeHashLWT(ctx context.Context, fc *file.Client, size int64, hash []byte, lwt time.Time) error {
smbProps := file.SMBProperties{
LastWriteTime: &lwt,
}
httpHeaders := &file.HTTPHeaders{
ContentMD5: hash,
}
_, err := fc.SetHTTPHeaders(ctx, &file.SetHTTPHeadersOptions{
FileContentLength: &size,
SMBProperties: &smbProps,
HTTPHeaders: httpHeaders,
})
if err != nil {
return fmt.Errorf("while setting size, hash, lastWriteTime : %w", err)
}
return nil
}
func chunkSize(options ...fs.OpenOption) int64 {
for _, option := range options {
if chunkOpt, ok := option.(*fs.ChunkOption); ok {
return chunkOpt.ChunkSize
}
}
return 1048576
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.common.String()
}