cache,chunker,combine,compress,crypt,hasher,union: implement MkdirMetadata and related Features

This commit is contained in:
Nick Craig-Wood 2024-02-27 11:04:38 +00:00
parent 0297542f6b
commit 39db8caff1
9 changed files with 315 additions and 81 deletions

View file

@ -18,8 +18,9 @@ func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestCache:",
NilObject: (*cache.Object)(nil),
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "OpenChunkWriter"},
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "OpenChunkWriter", "DirSetModTime", "MkdirMetadata"},
UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier", "Metadata"},
UnimplementableDirectoryMethods: []string{"Metadata", "SetMetadata", "SetModTime"},
SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache
})
}

View file

@ -345,6 +345,11 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
BucketBased: true,
CanHaveEmptyDirectories: true,
ServerSideAcrossConfigs: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
}).Fill(ctx, f).Mask(ctx, baseFs).WrapsFs(f, baseFs)
f.features.Disable("ListR") // Recursive listing may cause chunker skip files
@ -821,8 +826,7 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
}
case fs.Directory:
isSubdir[entry.Remote()] = true
wrapDir := fs.NewDirCopy(ctx, entry)
wrapDir.SetRemote(entry.Remote())
wrapDir := fs.NewDirWrapper(entry.Remote(), entry)
tempEntries = append(tempEntries, wrapDir)
default:
if f.opt.FailHard {
@ -1571,6 +1575,14 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return f.base.Mkdir(ctx, dir)
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
if do := f.base.Features().MkdirMetadata; do != nil {
return do(ctx, dir, metadata)
}
return nil, fs.ErrorNotImplemented
}
// Rmdir removes the directory (container, bucket) if empty
//
// Return an error if it doesn't exist or isn't empty
@ -2557,6 +2569,7 @@ var (
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)

View file

@ -233,6 +233,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}).Fill(ctx, f)
canMove := true
@ -440,6 +445,32 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return u.f.Mkdir(ctx, uRemote)
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return nil, err
}
do := u.f.Features().MkdirMetadata
if do == nil {
return nil, fs.ErrorNotImplemented
}
newDir, err := do(ctx, uRemote, metadata)
if err != nil {
return nil, err
}
entries := fs.DirEntries{newDir}
entries, err = u.wrapEntries(ctx, entries)
if err != nil {
return nil, err
}
newDir, ok := entries[0].(fs.Directory)
if !ok {
return nil, fmt.Errorf("internal error: expecting %T to be fs.Directory", entries[0])
}
return newDir, nil
}
// purge the upstream or fallback to a slow way
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
if do := u.f.Features().Purge; do != nil {
@ -755,12 +786,11 @@ func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.D
case fs.Object:
entries[i] = u.newObject(x)
case fs.Directory:
newDir := fs.NewDirCopy(ctx, x)
newPath, err := u.pathAdjustment.do(newDir.Remote())
newPath, err := u.pathAdjustment.do(x.Remote())
if err != nil {
return nil, err
}
newDir.SetRemote(newPath)
newDir := fs.NewDirWrapper(newPath, x)
entries[i] = newDir
default:
return nil, fmt.Errorf("unknown entry type %T", entry)
@ -1116,6 +1146,7 @@ var (
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.MergeDirser = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.OpenWriterAter = (*Fs)(nil)
_ fs.FullObject = (*Object)(nil)

View file

@ -194,6 +194,11 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs)
// We support reading MIME types no matter the wrapped fs
@ -784,6 +789,14 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return f.Fs.Mkdir(ctx, dir)
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
if do := f.Fs.Features().MkdirMetadata; do != nil {
return do(ctx, dir, metadata)
}
return nil, fs.ErrorNotImplemented
}
// Rmdir removes the directory (container, bucket) if empty
//
// Return an error if it doesn't exist or isn't empty
@ -1506,6 +1519,7 @@ var (
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil)

View file

@ -275,6 +275,11 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs)
@ -520,6 +525,25 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return f.Fs.Mkdir(ctx, f.cipher.EncryptDirName(dir))
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
do := f.Fs.Features().MkdirMetadata
if do == nil {
return nil, fs.ErrorNotImplemented
}
newDir, err := do(ctx, f.cipher.EncryptDirName(dir), metadata)
if err != nil {
return nil, err
}
var entries = make(fs.DirEntries, 0, 1)
f.addDir(ctx, &entries, newDir)
newDir, ok := entries[0].(fs.Directory)
if !ok {
return nil, fmt.Errorf("internal error: expecting %T to be fs.Directory", entries[0])
}
return newDir, nil
}
// DirSetModTime sets the directory modtime for dir
func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error {
do := f.Fs.Features().DirSetModTime
@ -770,7 +794,7 @@ func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
}
out := make([]fs.Directory, len(dirs))
for i, dir := range dirs {
out[i] = fs.NewDirCopy(ctx, dir).SetRemote(f.cipher.EncryptDirName(dir.Remote()))
out[i] = fs.NewDirWrapper(f.cipher.EncryptDirName(dir.Remote()), dir)
}
return do(ctx, out)
}
@ -1006,14 +1030,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// newDir returns a dir with the Name decrypted
func (f *Fs) newDir(ctx context.Context, dir fs.Directory) fs.Directory {
newDir := fs.NewDirCopy(ctx, dir)
remote := dir.Remote()
decryptedRemote, err := f.cipher.DecryptDirName(remote)
if err != nil {
fs.Debugf(remote, "Undecryptable dir name: %v", err)
} else {
newDir.SetRemote(decryptedRemote)
remote = decryptedRemote
}
newDir := fs.NewDirWrapper(remote, dir)
return newDir
}
@ -1217,6 +1241,7 @@ var (
_ fs.Wrapper = (*Fs)(nil)
_ fs.MergeDirser = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.PublicLinker = (*Fs)(nil)

View file

@ -173,6 +173,11 @@ func NewFs(ctx context.Context, fsname, rpath string, cmap configmap.Mapper) (fs
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}
f.features = stubFeatures.Fill(ctx, f).Mask(ctx, f.Fs).WrapsFs(f, f.Fs)
@ -349,6 +354,14 @@ func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) e
return fs.ErrorNotImplemented
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
if do := f.Fs.Features().MkdirMetadata; do != nil {
return do(ctx, dir, metadata)
}
return nil, fs.ErrorNotImplemented
}
// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
@ -539,6 +552,7 @@ var (
_ fs.Wrapper = (*Fs)(nil)
_ fs.MergeDirser = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.PublicLinker = (*Fs)(nil)

View file

@ -27,6 +27,7 @@ type Object struct {
// This is a wrapped object contains all candidates
type Directory struct {
*upstream.Directory
fs *Fs // what this directory is part of
cd []upstream.Entry
}
@ -227,7 +228,56 @@ func (d *Directory) Size() (s int64) {
return s
}
// SetMetadata sets metadata for an DirEntry
//
// It should return fs.ErrorNotImplemented if it can't set metadata
func (d *Directory) SetMetadata(ctx context.Context, metadata fs.Metadata) error {
entries, err := d.fs.actionEntries(d.candidates()...)
if err != nil {
return err
}
var wg sync.WaitGroup
errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) {
if d, ok := entries[i].(*upstream.Directory); ok {
err := d.SetMetadata(ctx, metadata)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", d.UpstreamFs().Name(), err)
}
} else {
errs[i] = fs.ErrorIsFile
}
})
wg.Wait()
return errs.Err()
}
// SetModTime sets the metadata on the DirEntry to set the modification date
//
// If there is any other metadata it does not overwrite it.
func (d *Directory) SetModTime(ctx context.Context, t time.Time) error {
entries, err := d.fs.actionEntries(d.candidates()...)
if err != nil {
return err
}
var wg sync.WaitGroup
errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) {
if d, ok := entries[i].(*upstream.Directory); ok {
err := d.SetModTime(ctx, t)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", d.UpstreamFs().Name(), err)
}
} else {
errs[i] = fs.ErrorIsFile
}
})
wg.Wait()
return errs.Err()
}
// Check the interfaces are satisfied
var (
_ fs.FullObject = (*Object)(nil)
_ fs.FullDirectory = (*Directory)(nil)
)

View file

@ -95,6 +95,7 @@ func (f *Fs) wrapEntries(entries ...upstream.Entry) (entry, error) {
case *upstream.Directory:
return &Directory{
Directory: e,
fs: f,
cd: entries,
}, nil
default:
@ -182,6 +183,51 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
return err
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
upstreams, err := f.create(ctx, dir)
if err != nil {
return nil, err
}
errs := Errors(make([]error, len(upstreams)))
entries := make([]upstream.Entry, len(upstreams))
multithread(len(upstreams), func(i int) {
u := upstreams[i]
if do := u.Features().MkdirMetadata; do != nil {
newDir, err := do(ctx, dir, metadata)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", upstreams[i].Name(), err)
} else {
entries[i], err = u.WrapEntry(newDir)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", upstreams[i].Name(), err)
}
}
} else {
// Just do Mkdir on upstreams which don't support MkdirMetadata
err := u.Mkdir(ctx, dir)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", upstreams[i].Name(), err)
}
}
})
err = errs.Err()
if err != nil {
return nil, err
}
entry, err := f.wrapEntries(entries...)
if err != nil {
return nil, err
}
newDir, ok := entry.(fs.Directory)
if !ok {
return nil, fmt.Errorf("internal error: expecting %T to be an fs.Directory", entry)
}
return newDir, nil
}
// Purge all files in the directory
//
// Implement this if you have a way of deleting all the files
@ -933,6 +979,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}).Fill(ctx, f)
canMove, slowHash := true, false
@ -1009,6 +1060,7 @@ var (
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)

View file

@ -322,6 +322,39 @@ func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
return do.Metadata(ctx)
}
// Metadata returns metadata for an DirEntry
//
// It should return nil if there is no Metadata
func (e *Directory) Metadata(ctx context.Context) (fs.Metadata, error) {
do, ok := e.Directory.(fs.Metadataer)
if !ok {
return nil, nil
}
return do.Metadata(ctx)
}
// SetMetadata sets metadata for an DirEntry
//
// It should return fs.ErrorNotImplemented if it can't set metadata
func (e *Directory) SetMetadata(ctx context.Context, metadata fs.Metadata) error {
do, ok := e.Directory.(fs.SetMetadataer)
if !ok {
return fs.ErrorNotImplemented
}
return do.SetMetadata(ctx, metadata)
}
// SetModTime sets the metadata on the DirEntry to set the modification date
//
// If there is any other metadata it does not overwrite it.
func (e *Directory) SetModTime(ctx context.Context, t time.Time) error {
do, ok := e.Directory.(fs.SetModTimer)
if !ok {
return fs.ErrorNotImplemented
}
return do.SetModTime(ctx, t)
}
// Writeback writes the object back and returns a new object
//
// If it returns nil, nil then the original object is OK
@ -458,4 +491,5 @@ func (f *Fs) updateUsageCore(lock bool) error {
// Check the interfaces are satisfied
var (
_ fs.FullObject = (*Object)(nil)
_ fs.FullDirectory = (*Directory)(nil)
)