rclone/backend/union/upstream/upstream.go
Josh Soref d0888edc0a Spelling fixes
Fix spelling of: above, already, anonymous, associated,
authentication, bandwidth, because, between, blocks, calculate,
candidates, cautious, changelog, cleaner, clipboard, command,
completely, concurrently, considered, constructs, corrupt, current,
daemon, dependencies, deprecated, directory, dispatcher, download,
eligible, ellipsis, encrypter, endpoint, entrieslist, essentially,
existing writers, existing, expires, filesystem, flushing, frequently,
hierarchy, however, implementation, implements, inaccurate,
individually, insensitive, longer, maximum, metadata, modified,
multipart, namedirfirst, nextcloud, obscured, opened, optional,
owncloud, pacific, passphrase, password, permanently, persimmon,
positive, potato, protocol, quota, receiving, recommends, referring,
requires, revisited, satisfied, satisfies, satisfy, semver,
serialized, session, storage, strategies, stringlist, successful,
supported, surprise, temporarily, temporary, transactions, unneeded,
update, uploads, wrapped

Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-10-14 15:21:31 +01:00

352 lines
8.2 KiB
Go

package upstream
import (
"context"
"io"
"math"
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
)
var (
// ErrUsageFieldNotSupported stats the usage field is not supported by the backend
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
)
// Fs is a wrap of any fs and its configs
type Fs struct {
fs.Fs
RootFs fs.Fs
RootPath string
writable bool
creatable bool
usage *fs.Usage // Cache the usage
cacheTime time.Duration // cache duration
cacheExpiry int64 // usage cache expiry time
cacheMutex sync.RWMutex
cacheOnce sync.Once
cacheUpdate bool // if the cache is updating
}
// Directory describes a wrapped Directory
//
// This is a wrapped Directory which contains the upstream Fs
type Directory struct {
fs.Directory
f *Fs
}
// Object describes a wrapped Object
//
// This is a wrapped Object which contains the upstream Fs
type Object struct {
fs.Object
f *Fs
}
// Entry describe a wrapped fs.DirEntry interface with the
// information of upstream Fs
type Entry interface {
fs.DirEntry
UpstreamFs() *Fs
}
// New creates a new Fs based on the
// string formatted `type:root_path(:ro/:nc)`
func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
_, configName, fsPath, err := fs.ParseRemote(remote)
if err != nil {
return nil, err
}
f := &Fs{
RootPath: root,
writable: true,
creatable: true,
cacheExpiry: time.Now().Unix(),
cacheTime: cacheTime,
usage: &fs.Usage{},
}
if strings.HasSuffix(fsPath, ":ro") {
f.writable = false
f.creatable = false
fsPath = fsPath[0 : len(fsPath)-3]
} else if strings.HasSuffix(fsPath, ":nc") {
f.writable = true
f.creatable = false
fsPath = fsPath[0 : len(fsPath)-3]
}
if configName != "local" {
fsPath = configName + ":" + fsPath
}
rFs, err := cache.Get(fsPath)
if err != nil && err != fs.ErrorIsFile {
return nil, err
}
f.RootFs = rFs
rootString := path.Join(fsPath, filepath.ToSlash(root))
myFs, err := cache.Get(rootString)
if err != nil && err != fs.ErrorIsFile {
return nil, err
}
f.Fs = myFs
cache.PinUntilFinalized(f.Fs, f)
return f, err
}
// WrapDirectory wraps an fs.Directory to include the info
// of the upstream Fs
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
if e == nil {
return nil
}
return &Directory{
Directory: e,
f: f,
}
}
// WrapObject wraps an fs.Object to include the info
// of the upstream Fs
func (f *Fs) WrapObject(o fs.Object) *Object {
if o == nil {
return nil
}
return &Object{
Object: o,
f: f,
}
}
// WrapEntry wraps an fs.DirEntry to include the info
// of the upstream Fs
func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) {
switch e.(type) {
case fs.Object:
return f.WrapObject(e.(fs.Object)), nil
case fs.Directory:
return f.WrapDirectory(e.(fs.Directory)), nil
default:
return nil, errors.Errorf("unknown object type %T", e)
}
}
// UpstreamFs get the upstream Fs the entry is stored in
func (e *Directory) UpstreamFs() *Fs {
return e.f
}
// UpstreamFs get the upstream Fs the entry is stored in
func (o *Object) UpstreamFs() *Fs {
return o.f
}
// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() fs.Object {
return o.Object
}
// IsCreatable return if the fs is allowed to create new objects
func (f *Fs) IsCreatable() bool {
return f.creatable
}
// IsWritable return if the fs is allowed to write
func (f *Fs) IsWritable() bool {
return f.writable
}
// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.Fs.Put(ctx, in, src, options...)
if err != nil {
return o, err
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
size := src.Size()
if f.usage.Used != nil {
*f.usage.Used += size
}
if f.usage.Free != nil {
*f.usage.Free -= size
}
if f.usage.Objects != nil {
*f.usage.Objects++
}
return o, nil
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Features().PutStream
if do == nil {
return nil, fs.ErrorNotImplemented
}
o, err := do(ctx, in, src, options...)
if err != nil {
return o, err
}
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
size := o.Size()
if f.usage.Used != nil {
*f.usage.Used += size
}
if f.usage.Free != nil {
*f.usage.Free -= size
}
if f.usage.Objects != nil {
*f.usage.Objects++
}
return o, nil
}
// Update in to the object with the modTime given of the given size
//
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
// return an error or update the object properly (rather than e.g. calling panic).
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := o.Size()
err := o.Object.Update(ctx, in, src, options...)
if err != nil {
return err
}
o.f.cacheMutex.Lock()
defer o.f.cacheMutex.Unlock()
delta := o.Size() - size
if delta <= 0 {
return nil
}
if o.f.usage.Used != nil {
*o.f.usage.Used += size
}
if o.f.usage.Free != nil {
*o.f.usage.Free -= size
}
return nil
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return nil, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
return f.usage, nil
}
// GetFreeSpace get the free space of the fs
func (f *Fs) GetFreeSpace() (int64, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return math.MaxInt64, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.usage.Free == nil {
return math.MaxInt64, ErrUsageFieldNotSupported
}
return *f.usage.Free, nil
}
// GetUsedSpace get the used space of the fs
func (f *Fs) GetUsedSpace() (int64, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return 0, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.usage.Used == nil {
return 0, ErrUsageFieldNotSupported
}
return *f.usage.Used, nil
}
// GetNumObjects get the number of objects of the fs
func (f *Fs) GetNumObjects() (int64, error) {
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
err := f.updateUsage()
if err != nil {
return 0, ErrUsageFieldNotSupported
}
}
f.cacheMutex.RLock()
defer f.cacheMutex.RUnlock()
if f.usage.Objects == nil {
return 0, ErrUsageFieldNotSupported
}
return *f.usage.Objects, nil
}
func (f *Fs) updateUsage() (err error) {
if do := f.RootFs.Features().About; do == nil {
return ErrUsageFieldNotSupported
}
done := false
f.cacheOnce.Do(func() {
f.cacheMutex.Lock()
err = f.updateUsageCore(false)
f.cacheMutex.Unlock()
done = true
})
if done {
return err
}
if !f.cacheUpdate {
f.cacheUpdate = true
go func() {
_ = f.updateUsageCore(true)
f.cacheUpdate = false
}()
}
return nil
}
func (f *Fs) updateUsageCore(lock bool) error {
// Run in background, should not be cancelled by user
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
usage, err := f.RootFs.Features().About(ctx)
if err != nil {
f.cacheUpdate = false
if errors.Cause(err) == fs.ErrorDirNotFound {
err = nil
}
return err
}
if lock {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
}
// Store usage
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
f.usage = usage
return nil
}