forked from TrueCloudLab/rclone
85f9bd1abf
Before this change we errored out if one upstream errored in Purge or About. This change checks for fs.ErrorDirNotFound and skips that backend in this case.
351 lines
8.1 KiB
Go
351 lines
8.1 KiB
Go
package upstream
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/cache"
|
|
)
|
|
|
|
var (
|
|
// ErrUsageFieldNotSupported stats the usage field is not supported by the backend
|
|
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
|
|
)
|
|
|
|
// Fs is a wrap of any fs and its configs
|
|
type Fs struct {
|
|
fs.Fs
|
|
RootFs fs.Fs
|
|
RootPath string
|
|
writable bool
|
|
creatable bool
|
|
usage *fs.Usage // Cache the usage
|
|
cacheTime time.Duration // cache duration
|
|
cacheExpiry int64 // usage cache expiry time
|
|
cacheMutex sync.RWMutex
|
|
cacheOnce sync.Once
|
|
cacheUpdate bool // if the cache is updating
|
|
}
|
|
|
|
// Directory describes a wrapped Directory
|
|
//
|
|
// This is a wrapped Directory which contains the upstream Fs
|
|
type Directory struct {
|
|
fs.Directory
|
|
f *Fs
|
|
}
|
|
|
|
// Object describes a wrapped Object
|
|
//
|
|
// This is a wrapped Object which contains the upstream Fs
|
|
type Object struct {
|
|
fs.Object
|
|
f *Fs
|
|
}
|
|
|
|
// Entry describe a warpped fs.DirEntry interface with the
|
|
// information of upstream Fs
|
|
type Entry interface {
|
|
fs.DirEntry
|
|
UpstreamFs() *Fs
|
|
}
|
|
|
|
// New creates a new Fs based on the
|
|
// string formatted `type:root_path(:ro/:nc)`
|
|
func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
|
|
_, configName, fsPath, err := fs.ParseRemote(remote)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f := &Fs{
|
|
RootPath: root,
|
|
writable: true,
|
|
creatable: true,
|
|
cacheExpiry: time.Now().Unix(),
|
|
cacheTime: cacheTime,
|
|
usage: &fs.Usage{},
|
|
}
|
|
if strings.HasSuffix(fsPath, ":ro") {
|
|
f.writable = false
|
|
f.creatable = false
|
|
fsPath = fsPath[0 : len(fsPath)-3]
|
|
} else if strings.HasSuffix(fsPath, ":nc") {
|
|
f.writable = true
|
|
f.creatable = false
|
|
fsPath = fsPath[0 : len(fsPath)-3]
|
|
}
|
|
if configName != "local" {
|
|
fsPath = configName + ":" + fsPath
|
|
}
|
|
rFs, err := cache.Get(fsPath)
|
|
if err != nil && err != fs.ErrorIsFile {
|
|
return nil, err
|
|
}
|
|
f.RootFs = rFs
|
|
rootString := path.Join(fsPath, filepath.ToSlash(root))
|
|
myFs, err := cache.Get(rootString)
|
|
if err != nil && err != fs.ErrorIsFile {
|
|
return nil, err
|
|
}
|
|
f.Fs = myFs
|
|
return f, err
|
|
}
|
|
|
|
// WrapDirectory wraps an fs.Directory to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
|
|
if e == nil {
|
|
return nil
|
|
}
|
|
return &Directory{
|
|
Directory: e,
|
|
f: f,
|
|
}
|
|
}
|
|
|
|
// WrapObject wraps an fs.Object to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapObject(o fs.Object) *Object {
|
|
if o == nil {
|
|
return nil
|
|
}
|
|
return &Object{
|
|
Object: o,
|
|
f: f,
|
|
}
|
|
}
|
|
|
|
// WrapEntry wraps an fs.DirEntry to include the info
|
|
// of the upstream Fs
|
|
func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) {
|
|
switch e.(type) {
|
|
case fs.Object:
|
|
return f.WrapObject(e.(fs.Object)), nil
|
|
case fs.Directory:
|
|
return f.WrapDirectory(e.(fs.Directory)), nil
|
|
default:
|
|
return nil, errors.Errorf("unknown object type %T", e)
|
|
}
|
|
}
|
|
|
|
// UpstreamFs get the upstream Fs the entry is stored in
|
|
func (e *Directory) UpstreamFs() *Fs {
|
|
return e.f
|
|
}
|
|
|
|
// UpstreamFs get the upstream Fs the entry is stored in
|
|
func (o *Object) UpstreamFs() *Fs {
|
|
return o.f
|
|
}
|
|
|
|
// UnWrap returns the Object that this Object is wrapping or
|
|
// nil if it isn't wrapping anything
|
|
func (o *Object) UnWrap() fs.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// IsCreatable return if the fs is allowed to create new objects
|
|
func (f *Fs) IsCreatable() bool {
|
|
return f.creatable
|
|
}
|
|
|
|
// IsWritable return if the fs is allowed to write
|
|
func (f *Fs) IsWritable() bool {
|
|
return f.writable
|
|
}
|
|
|
|
// Put in to the remote path with the modTime given of the given size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
o, err := f.Fs.Put(ctx, in, src, options...)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
size := src.Size()
|
|
if f.usage.Used != nil {
|
|
*f.usage.Used += size
|
|
}
|
|
if f.usage.Free != nil {
|
|
*f.usage.Free -= size
|
|
}
|
|
if f.usage.Objects != nil {
|
|
*f.usage.Objects++
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
do := f.Features().PutStream
|
|
if do == nil {
|
|
return nil, fs.ErrorNotImplemented
|
|
}
|
|
o, err := do(ctx, in, src, options...)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
size := o.Size()
|
|
if f.usage.Used != nil {
|
|
*f.usage.Used += size
|
|
}
|
|
if f.usage.Free != nil {
|
|
*f.usage.Free -= size
|
|
}
|
|
if f.usage.Objects != nil {
|
|
*f.usage.Objects++
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// Update in to the object with the modTime given of the given size
|
|
//
|
|
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
|
|
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
|
// return an error or update the object properly (rather than e.g. calling panic).
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
size := o.Size()
|
|
err := o.Object.Update(ctx, in, src, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.f.cacheMutex.Lock()
|
|
defer o.f.cacheMutex.Unlock()
|
|
delta := o.Size() - size
|
|
if delta <= 0 {
|
|
return nil
|
|
}
|
|
if o.f.usage.Used != nil {
|
|
*o.f.usage.Used += size
|
|
}
|
|
if o.f.usage.Free != nil {
|
|
*o.f.usage.Free -= size
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// About gets quota information from the Fs
|
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return nil, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
return f.usage, nil
|
|
}
|
|
|
|
// GetFreeSpace get the free space of the fs
|
|
func (f *Fs) GetFreeSpace() (int64, error) {
|
|
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return math.MaxInt64, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Free == nil {
|
|
return math.MaxInt64, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Free, nil
|
|
}
|
|
|
|
// GetUsedSpace get the used space of the fs
|
|
func (f *Fs) GetUsedSpace() (int64, error) {
|
|
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Used == nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Used, nil
|
|
}
|
|
|
|
// GetNumObjects get the number of objects of the fs
|
|
func (f *Fs) GetNumObjects() (int64, error) {
|
|
if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() {
|
|
err := f.updateUsage()
|
|
if err != nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
}
|
|
f.cacheMutex.RLock()
|
|
defer f.cacheMutex.RUnlock()
|
|
if f.usage.Objects == nil {
|
|
return 0, ErrUsageFieldNotSupported
|
|
}
|
|
return *f.usage.Objects, nil
|
|
}
|
|
|
|
func (f *Fs) updateUsage() (err error) {
|
|
if do := f.RootFs.Features().About; do == nil {
|
|
return ErrUsageFieldNotSupported
|
|
}
|
|
done := false
|
|
f.cacheOnce.Do(func() {
|
|
f.cacheMutex.Lock()
|
|
err = f.updateUsageCore(false)
|
|
f.cacheMutex.Unlock()
|
|
done = true
|
|
})
|
|
if done {
|
|
return err
|
|
}
|
|
if !f.cacheUpdate {
|
|
f.cacheUpdate = true
|
|
go func() {
|
|
_ = f.updateUsageCore(true)
|
|
f.cacheUpdate = false
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) updateUsageCore(lock bool) error {
|
|
// Run in background, should not be cancelled by user
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
usage, err := f.RootFs.Features().About(ctx)
|
|
if err != nil {
|
|
f.cacheUpdate = false
|
|
if errors.Cause(err) == fs.ErrorDirNotFound {
|
|
err = nil
|
|
}
|
|
return err
|
|
}
|
|
if lock {
|
|
f.cacheMutex.Lock()
|
|
defer f.cacheMutex.Unlock()
|
|
}
|
|
// Store usage
|
|
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
|
|
f.usage = usage
|
|
return nil
|
|
}
|