// Package hasher implements a checksum handling overlay backend package hasher import ( "context" "encoding/gob" "errors" "fmt" "io" "path" "strings" "sync" "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/kv" ) // Register with Fs func init() { fs.Register(&fs.RegInfo{ Name: "hasher", Description: "Better checksums for other remotes", NewFs: NewFs, MetadataInfo: &fs.MetadataInfo{ Help: `Any metadata supported by the underlying remote is read and written.`, }, CommandHelp: commandHelp, Options: []fs.Option{{ Name: "remote", Required: true, Help: "Remote to cache checksums for (e.g. myRemote:path).", }, { Name: "hashes", Default: fs.CommaSepList{"md5", "sha1"}, Advanced: false, Help: "Comma separated list of supported checksum types.", }, { Name: "max_age", Advanced: false, Default: fs.DurationOff, Help: "Maximum time to keep checksums in cache (0 = no cache, off = cache forever).", }, { Name: "auto_size", Advanced: true, Default: fs.SizeSuffix(0), Help: "Auto-update checksum for files smaller than this size (disabled by default).", }}, }) } // Options defines the configuration for this backend type Options struct { Remote string `config:"remote"` Hashes fs.CommaSepList `config:"hashes"` AutoSize fs.SizeSuffix `config:"auto_size"` MaxAge fs.Duration `config:"max_age"` } // Fs represents a wrapped fs.Fs type Fs struct { fs.Fs name string root string wrapper fs.Fs features *fs.Features opt *Options db *kv.DB // fingerprinting fpTime bool // true if using time in fingerprints fpHash hash.Type // hash type to use in fingerprints or None // hash types triaged by groups suppHashes hash.Set // all supported checksum types passHashes hash.Set // passed directly to the base without caching slowHashes hash.Set // passed to the base and then cached autoHashes hash.Set // calculated in-house and cached keepHashes hash.Set // checksums to keep in cache (slow + auto) } var warnExperimental sync.Once // NewFs constructs an Fs from the remote:path string func NewFs(ctx context.Context, fsname, rpath string, cmap configmap.Mapper) (fs.Fs, error) { if !kv.Supported() { return nil, errors.New("hasher is not supported on this OS") } warnExperimental.Do(func() { fs.Infof(nil, "Hasher is EXPERIMENTAL!") }) opt := &Options{} err := configstruct.Set(cmap, opt) if err != nil { return nil, err } if strings.HasPrefix(opt.Remote, fsname+":") { return nil, errors.New("can't point remote at itself") } remotePath := fspath.JoinRootPath(opt.Remote, rpath) baseFs, err := cache.Get(ctx, remotePath) if err != nil && err != fs.ErrorIsFile { return nil, fmt.Errorf("failed to derive base remote %q: %w", opt.Remote, err) } f := &Fs{ Fs: baseFs, name: fsname, root: rpath, opt: opt, } // Correct root if definitely pointing to a file if err == fs.ErrorIsFile { f.root = path.Dir(f.root) if f.root == "." || f.root == "/" { f.root = "" } } baseFeatures := baseFs.Features() f.fpTime = baseFs.Precision() != fs.ModTimeNotSupported if baseFeatures.SlowHash { f.slowHashes = f.Fs.Hashes() } else { f.passHashes = f.Fs.Hashes() f.fpHash = f.passHashes.GetOne() } f.suppHashes = f.passHashes f.suppHashes.Add(f.slowHashes.Array()...) for _, hashName := range opt.Hashes { var ht hash.Type if err := ht.Set(hashName); err != nil { return nil, fmt.Errorf("invalid token %q in hash string %q", hashName, opt.Hashes.String()) } if !f.slowHashes.Contains(ht) { f.autoHashes.Add(ht) } f.keepHashes.Add(ht) f.suppHashes.Add(ht) } fs.Debugf(f, "Groups by usage: cached %s, passed %s, auto %s, slow %s, supported %s", f.keepHashes, f.passHashes, f.autoHashes, f.slowHashes, f.suppHashes) var nilSet hash.Set if f.keepHashes == nilSet { return nil, errors.New("configured hash_names have nothing to keep in cache") } if f.opt.MaxAge > 0 { gob.Register(hashRecord{}) db, err := kv.Start(ctx, "hasher", f.Fs) if err != nil { return nil, err } f.db = db } stubFeatures := &fs.Features{ CanHaveEmptyDirectories: true, IsLocal: true, ReadMimeType: true, WriteMimeType: true, SetTier: true, GetTier: true, ReadMetadata: true, WriteMetadata: true, UserMetadata: true, PartialUploads: true, } f.features = stubFeatures.Fill(ctx, f).Mask(ctx, f.Fs).WrapsFs(f, f.Fs) cache.PinUntilFinalized(f.Fs, f) return f, err } // // Filesystem // // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name } // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { return f.root } // Features returns the optional features of this Fs func (f *Fs) Features() *fs.Features { return f.features } // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { return f.suppHashes } // String returns a description of the FS // The "hasher::" prefix is a distinctive feature. func (f *Fs) String() string { return fmt.Sprintf("hasher::%s:%s", f.name, f.root) } // UnWrap returns the Fs that this Fs is wrapping func (f *Fs) UnWrap() fs.Fs { return f.Fs } // WrapFs returns the Fs that is wrapping this Fs func (f *Fs) WrapFs() fs.Fs { return f.wrapper } // SetWrapper sets the Fs that is wrapping this Fs func (f *Fs) SetWrapper(wrapper fs.Fs) { f.wrapper = wrapper } // Wrap base entries into hasher entries. func (f *Fs) wrapEntries(baseEntries fs.DirEntries) (hashEntries fs.DirEntries, err error) { hashEntries = baseEntries[:0] // work inplace for _, entry := range baseEntries { switch x := entry.(type) { case fs.Object: obj, err := f.wrapObject(x, nil) if err != nil { return nil, err } hashEntries = append(hashEntries, obj) default: hashEntries = append(hashEntries, entry) // trash in - trash out } } return hashEntries, nil } // List the objects and directories in dir into entries. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { if entries, err = f.Fs.List(ctx, dir); err != nil { return nil, err } return f.wrapEntries(entries) } // ListR lists the objects and directories recursively into out. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { return f.Fs.Features().ListR(ctx, dir, func(baseEntries fs.DirEntries) error { hashEntries, err := f.wrapEntries(baseEntries) if err != nil { return err } return callback(hashEntries) }) } // Purge a directory func (f *Fs) Purge(ctx context.Context, dir string) error { if do := f.Fs.Features().Purge; do != nil { if err := do(ctx, dir); err != nil { return err } err := f.db.Do(true, &kvPurge{ dir: path.Join(f.Fs.Root(), dir), }) if err != nil { fs.Errorf(f, "Failed to purge some hashes: %v", err) } return nil } return fs.ErrorCantPurge } // PutStream uploads to the remote path with undeterminate size. func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { if do := f.Fs.Features().PutStream; do != nil { _ = f.pruneHash(src.Remote()) oResult, err := do(ctx, in, src, options...) return f.wrapObject(oResult, err) } return nil, errors.New("PutStream not supported") } // PutUnchecked uploads the object, allowing duplicates. func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { if do := f.Fs.Features().PutUnchecked; do != nil { _ = f.pruneHash(src.Remote()) oResult, err := do(ctx, in, src, options...) return f.wrapObject(oResult, err) } return nil, errors.New("PutUnchecked not supported") } // pruneHash deletes hash for a path func (f *Fs) pruneHash(remote string) error { return f.db.Do(true, &kvPrune{ key: path.Join(f.Fs.Root(), remote), }) } // CleanUp the trash in the Fs func (f *Fs) CleanUp(ctx context.Context) error { if do := f.Fs.Features().CleanUp; do != nil { return do(ctx) } return errors.New("not supported by underlying remote") } // About gets quota information from the Fs func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { if do := f.Fs.Features().About; do != nil { return do(ctx) } return nil, errors.New("not supported by underlying remote") } // ChangeNotify calls the passed function with a path that has had changes. func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { if do := f.Fs.Features().ChangeNotify; do != nil { do(ctx, notifyFunc, pollIntervalChan) } } // UserInfo returns info about the connected user func (f *Fs) UserInfo(ctx context.Context) (map[string]string, error) { if do := f.Fs.Features().UserInfo; do != nil { return do(ctx) } return nil, fs.ErrorNotImplemented } // Disconnect the current user func (f *Fs) Disconnect(ctx context.Context) error { if do := f.Fs.Features().Disconnect; do != nil { return do(ctx) } return fs.ErrorNotImplemented } // MergeDirs merges the contents of all the directories passed // in into the first one and rmdirs the other directories. func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { if do := f.Fs.Features().MergeDirs; do != nil { return do(ctx, dirs) } return errors.New("MergeDirs not supported") } // DirSetModTime sets the directory modtime for dir func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error { if do := f.Fs.Features().DirSetModTime; do != nil { return do(ctx, dir, modTime) } return fs.ErrorNotImplemented } // DirCacheFlush resets the directory cache - used in testing // as an optional interface func (f *Fs) DirCacheFlush() { if do := f.Fs.Features().DirCacheFlush; do != nil { do() } } // PublicLink generates a public link to the remote path (usually readable by anyone) func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) { if do := f.Fs.Features().PublicLink; do != nil { return do(ctx, remote, expire, unlink) } return "", errors.New("PublicLink not supported") } // Copy src to this remote using server-side copy operations. func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { do := f.Fs.Features().Copy if do == nil { return nil, fs.ErrorCantCopy } o, ok := src.(*Object) if !ok { return nil, fs.ErrorCantCopy } oResult, err := do(ctx, o.Object, remote) return f.wrapObject(oResult, err) } // Move src to this remote using server-side move operations. func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { do := f.Fs.Features().Move if do == nil { return nil, fs.ErrorCantMove } o, ok := src.(*Object) if !ok { return nil, fs.ErrorCantMove } oResult, err := do(ctx, o.Object, remote) if err != nil { return nil, err } _ = f.db.Do(true, &kvMove{ src: path.Join(f.Fs.Root(), src.Remote()), dst: path.Join(f.Fs.Root(), remote), dir: false, fs: f, }) return f.wrapObject(oResult, nil) } // DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations. func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { do := f.Fs.Features().DirMove if do == nil { return fs.ErrorCantDirMove } srcFs, ok := src.(*Fs) if !ok { return fs.ErrorCantDirMove } err := do(ctx, srcFs.Fs, srcRemote, dstRemote) if err == nil { _ = f.db.Do(true, &kvMove{ src: path.Join(srcFs.Fs.Root(), srcRemote), dst: path.Join(f.Fs.Root(), dstRemote), dir: true, fs: f, }) } return err } // Shutdown the backend, closing any background tasks and any cached connections. func (f *Fs) Shutdown(ctx context.Context) (err error) { if f.db != nil { err = f.db.Stop(false) } if do := f.Fs.Features().Shutdown; do != nil { if err2 := do(ctx); err2 != nil { err = err2 } } return } // NewObject finds the Object at remote. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { o, err := f.Fs.NewObject(ctx, remote) return f.wrapObject(o, err) } // // Object // // Object represents a composite file wrapping one or more data chunks type Object struct { fs.Object f *Fs } // Wrap base object into hasher object func (f *Fs) wrapObject(o fs.Object, err error) (obj fs.Object, outErr error) { // log.Trace(o, "err=%v", err)("obj=%#v, outErr=%v", &obj, &outErr) if err != nil { return nil, err } if o == nil { return nil, fs.ErrorObjectNotFound } return &Object{Object: o, f: f}, nil } // Fs returns read only access to the Fs that this object is part of func (o *Object) Fs() fs.Info { return o.f } // UnWrap returns the wrapped Object func (o *Object) UnWrap() fs.Object { return o.Object } // Return a string version func (o *Object) String() string { if o == nil { return "" } return o.Object.String() } // ID returns the ID of the Object if possible func (o *Object) ID() string { if doer, ok := o.Object.(fs.IDer); ok { return doer.ID() } return "" } // GetTier returns the Tier of the Object if possible func (o *Object) GetTier() string { if doer, ok := o.Object.(fs.GetTierer); ok { return doer.GetTier() } return "" } // SetTier set the Tier of the Object if possible func (o *Object) SetTier(tier string) error { if doer, ok := o.Object.(fs.SetTierer); ok { return doer.SetTier(tier) } return errors.New("SetTier not supported") } // MimeType of an Object if known, "" otherwise func (o *Object) MimeType(ctx context.Context) string { if doer, ok := o.Object.(fs.MimeTyper); ok { return doer.MimeType(ctx) } return "" } // Metadata returns metadata for an object // // It should return nil if there is no Metadata func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) { do, ok := o.Object.(fs.Metadataer) if !ok { return nil, nil } return do.Metadata(ctx) } // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) _ fs.Copier = (*Fs)(nil) _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.Commander = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil) _ fs.Abouter = (*Fs)(nil) _ fs.Wrapper = (*Fs)(nil) _ fs.MergeDirser = (*Fs)(nil) _ fs.DirSetModTimer = (*Fs)(nil) _ fs.DirCacheFlusher = (*Fs)(nil) _ fs.ChangeNotifier = (*Fs)(nil) _ fs.PublicLinker = (*Fs)(nil) _ fs.UserInfoer = (*Fs)(nil) _ fs.Disconnecter = (*Fs)(nil) _ fs.Shutdowner = (*Fs)(nil) _ fs.FullObject = (*Object)(nil) )