union: refine implementation

This commit is contained in:
Max Sum 2020-01-28 00:14:29 +08:00 committed by Nick Craig-Wood
parent 266c200f8c
commit f0c17a72db
5 changed files with 49 additions and 61 deletions

View file

@ -59,8 +59,7 @@ func (d *Directory) candidates() []upstream.Entry {
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
// return an error or update the object properly (rather than e.g. calling panic).
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
entries, err := o.fs.actionEntries(o.candidates()...)
if err != nil {
return err
}
@ -106,8 +105,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Remove candidate objects selected by ACTION policy
func (o *Object) Remove(ctx context.Context) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
entries, err := o.fs.actionEntries(o.candidates()...)
if err != nil {
return err
}
@ -125,8 +123,7 @@ func (o *Object) Remove(ctx context.Context) error {
// SetModTime sets the metadata on the object to set the modification date
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
f := o.Fs().(*Fs)
entries, err := f.actionEntries(o.candidates()...)
entries, err := o.fs.actionEntries(o.candidates()...)
if err != nil {
return err
}

View file

@ -8,13 +8,6 @@ import (
// The Errors type wraps a slice of errors
type Errors []error
var (
// FilterNil returns the error directly
FilterNil = func(err error) error {
return err
}
)
// Map returns a copy of the error slice with all its errors modified
// according to the mapping function. If mapping returns nil,
// the error is dropped from the error slice with no replacement.
@ -32,34 +25,38 @@ func (e Errors) Map(mapping func(error) error) Errors {
return Errors(s[:i])
}
// Err returns a MultiError struct containing this Errors instance, or nil
// if there are zero errors contained.
func (e Errors) Err() error {
e = e.Map(FilterNil)
if len(e) == 0 {
return nil
}
return &MultiError{Errors: e}
// FilterNil returns the Errors without nil
func (e Errors) FilterNil() Errors {
ne := e.Map(func(err error) error {
return err
})
return ne
}
// MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError struct {
Errors Errors
// Err returns a error interface that filtered nil,
// or nil if no non-nil Error is presented.
func (e Errors) Err() error {
ne := e.FilterNil()
if len(ne) == 0 {
return nil
}
return ne
}
// Error returns a concatenated string of the contained errors
func (m *MultiError) Error() string {
func (e Errors) Error() string {
var buf bytes.Buffer
if len(m.Errors) == 1 {
if len(e) == 0 {
buf.WriteString("no error")
}
if len(e) == 1 {
buf.WriteString("1 error: ")
} else {
fmt.Fprintf(&buf, "%d errors: ", len(m.Errors))
fmt.Fprintf(&buf, "%d errors: ", len(e))
}
for i, err := range m.Errors {
for i, err := range e {
if i != 0 {
buf.WriteString("; ")
}

View file

@ -3,7 +3,6 @@ package policy
import (
"context"
"math/rand"
"time"
"github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs"
@ -20,12 +19,10 @@ type Rand struct {
}
func (p *Rand) rand(upstreams []*upstream.Fs) *upstream.Fs {
rand.Seed(time.Now().Unix())
return upstreams[rand.Intn(len(upstreams))]
}
func (p *Rand) randEntries(entries []upstream.Entry) upstream.Entry {
rand.Seed(time.Now().Unix())
return entries[rand.Intn(len(entries))]
}

View file

@ -530,7 +530,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
}
entriess[i] = uEntries
})
if len(errs) == len(errs.Map(FilterNil)) {
if len(errs) == len(errs.FilterNil()) {
errs = errs.Map(func(e error) error {
if errors.Cause(e) == fs.ErrorDirNotFound {
return nil

View file

@ -20,23 +20,17 @@ var (
ErrUsageFieldNotSupported = errors.New("this usage field is not supported")
)
const (
unInitilized uint32 = iota
initilizing
normal
updating
)
// Fs is a wrap of any fs and its configs
type Fs struct {
fs.Fs
writable bool
creatable bool
usage *fs.Usage // Cache the usage
cacheMutex sync.RWMutex
cacheExpiry int64 // usage cache expiry time
usage *fs.Usage // Cache the usage
cacheTime time.Duration // cache duration
cacheState uint32 // if the cache is updating
cacheExpiry int64 // usage cache expiry time
cacheMutex sync.RWMutex
cacheOnce sync.Once
cacheUpdate bool // if the cache is updating
}
// Directory describes a wrapped Directory
@ -279,42 +273,45 @@ func (f *Fs) GetUsedSpace() (int64, error) {
return *f.usage.Used, nil
}
func (f *Fs) updateUsage() error {
func (f *Fs) updateUsage() (err error) {
if do := f.Fs.Features().About; do == nil {
return ErrUsageFieldNotSupported
}
if atomic.LoadUint32(&f.cacheState) == unInitilized {
done := false
f.cacheOnce.Do(func() {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
if !atomic.CompareAndSwapUint32(&f.cacheState, unInitilized, initilizing) {
return f.updateUsage()
}
return f.updateUsageCore(false)
err = f.updateUsageCore(false)
f.cacheMutex.Unlock()
done = true
})
if done {
return err
}
if atomic.CompareAndSwapUint32(&f.cacheState, normal, updating) {
if !f.cacheUpdate {
f.cacheUpdate = true
go func() {
_ = f.updateUsageCore(true)
f.cacheUpdate = false
}()
}
return nil
}
func (f *Fs) updateUsageCore(lock bool) error {
defer func() {
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
atomic.StoreUint32(&f.cacheState, normal)
}()
if lock {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
}
// Run in background, should not be cancelled by user
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
usage, err := f.Features().About(ctx)
if err != nil {
f.cacheUpdate = false
return err
}
if lock {
f.cacheMutex.Lock()
defer f.cacheMutex.Unlock()
}
// Store usage
atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix())
f.usage = usage
return nil
}