diff --git a/backend/union/entry.go b/backend/union/entry.go index 2cd16c5f0..6b6a84b41 100644 --- a/backend/union/entry.go +++ b/backend/union/entry.go @@ -1,165 +1,170 @@ -package union - -import ( - "bufio" - "context" - "io" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/rclone/rclone/backend/union/upstream" - "github.com/rclone/rclone/fs" -) - -// Object describes a union Object -// -// This is a wrapped object which returns the Union Fs as its parent -type Object struct { - *upstream.Object - fs *Fs // what this object is part of - co []upstream.Entry -} - -// Directory describes a union Directory -// -// This is a wrapped object contains all candidates -type Directory struct { - *upstream.Directory - cd []upstream.Entry -} - -type entry interface { - upstream.Entry - candidates() []upstream.Entry -} - -// UnWrap returns the Object that this Object is wrapping or -// nil if it isn't wrapping anything -func (o *Object) UnWrap() *upstream.Object { - return o.Object -} - -// Fs returns the union Fs as the parent -func (o *Object) Fs() fs.Info { - return o.fs -} - -func (o *Object) candidates() []upstream.Entry { - return o.co -} - -func (d *Directory) candidates() []upstream.Entry { - return d.cd -} - -// Update in to the object with the modTime given of the given size -// -// When called from outside a Fs by rclone, src.Size() will always be >= 0. -// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either -// return an error or update the object properly (rather than e.g. calling panic). -func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - f := o.Fs().(*Fs) - entries, err := f.actionEntries(o.candidates()...) - if err != nil { - return err - } - if len(entries) == 1 { - obj := entries[0].(*upstream.Object) - return obj.Update(ctx, in, src, options...) - } - // Get multiple reader - readers := make([]io.Reader, len(entries)) - writers := make([]io.Writer, len(entries)) - errs := Errors(make([]error, len(entries)+1)) - for i := range entries { - r, w := io.Pipe() - bw := bufio.NewWriter(w) - readers[i], writers[i] = r, bw - defer w.Close() - } - go func() { - mw := io.MultiWriter(writers...) - es := make([]error, len(writers)+1) - _, es[len(es)-1] = io.Copy(mw, in) - for i, bw := range writers { - es[i] = bw.(*bufio.Writer).Flush() - } - errs[len(entries)] = Errors(es).Err() - }() - // Multi-threading - multithread(len(entries), func(i int) { - if o, ok := entries[i].(*upstream.Object); ok { - err := o.Update(ctx, readers[i], src, options...) - errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) - } else { - errs[i] = fs.ErrorNotAFile - } - }) - return errs.Err() -} - -// Remove candidate objects selected by ACTION policy -func (o *Object) Remove(ctx context.Context) error { - f := o.Fs().(*Fs) - entries, err := f.actionEntries(o.candidates()...) - if err != nil { - return err - } - errs := Errors(make([]error, len(entries))) - multithread(len(entries), func(i int) { - if o, ok := entries[i].(*upstream.Object); ok { - err := o.Remove(ctx) - errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) - } else { - errs[i] = fs.ErrorNotAFile - } - }) - return errs.Err() -} - -// SetModTime sets the metadata on the object to set the modification date -func (o *Object) SetModTime(ctx context.Context, t time.Time) error { - f := o.Fs().(*Fs) - entries, err := f.actionEntries(o.candidates()...) - if err != nil { - return err - } - var wg sync.WaitGroup - errs := Errors(make([]error, len(entries))) - multithread(len(entries), func(i int) { - if o, ok := entries[i].(*upstream.Object); ok { - err := o.SetModTime(ctx, t) - errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) - } else { - errs[i] = fs.ErrorNotAFile - } - }) - wg.Wait() - return errs.Err() -} - -// ModTime returns the modification date of the directory -// It returns the latest ModTime of all candidates -func (d *Directory) ModTime(ctx context.Context) (t time.Time) { - entries := d.candidates() - times := make([]time.Time, len(entries)) - multithread(len(entries), func(i int) { - times[i] = entries[i].ModTime(ctx) - }) - for _, ti := range times { - if t.Before(ti) { - t = ti - } - } - return t -} - -// Size returns the size of the directory -// It returns the sum of all candidates -func (d *Directory) Size() (s int64) { - for _, e := range d.candidates() { - s += e.Size() - } - return s -} +package union + +import ( + "bufio" + "context" + "io" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +// Object describes a union Object +// +// This is a wrapped object which returns the Union Fs as its parent +type Object struct { + *upstream.Object + fs *Fs // what this object is part of + co []upstream.Entry +} + +// Directory describes a union Directory +// +// This is a wrapped object contains all candidates +type Directory struct { + *upstream.Directory + cd []upstream.Entry +} + +type entry interface { + upstream.Entry + candidates() []upstream.Entry +} + +// UnWrap returns the Object that this Object is wrapping or +// nil if it isn't wrapping anything +func (o *Object) UnWrap() *upstream.Object { + return o.Object +} + +// Fs returns the union Fs as the parent +func (o *Object) Fs() fs.Info { + return o.fs +} + +func (o *Object) candidates() []upstream.Entry { + return o.co +} + +func (d *Directory) candidates() []upstream.Entry { + return d.cd +} + +// Update in to the object with the modTime given of the given size +// +// When called from outside a Fs by rclone, src.Size() will always be >= 0. +// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either +// return an error or update the object properly (rather than e.g. calling panic). +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + if len(entries) == 1 { + obj := entries[0].(*upstream.Object) + return obj.Update(ctx, in, src, options...) + } + // Get multiple reader + readers := make([]io.Reader, len(entries)) + writers := make([]io.Writer, len(entries)) + errs := Errors(make([]error, len(entries)+1)) + for i := range entries { + r, w := io.Pipe() + bw := bufio.NewWriter(w) + readers[i], writers[i] = r, bw + defer func() { + err := w.Close() + if err != nil { + panic(err) + } + }() + } + go func() { + mw := io.MultiWriter(writers...) + es := make([]error, len(writers)+1) + _, es[len(es)-1] = io.Copy(mw, in) + for i, bw := range writers { + es[i] = bw.(*bufio.Writer).Flush() + } + errs[len(entries)] = Errors(es).Err() + }() + // Multi-threading + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.Update(ctx, readers[i], src, options...) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + return errs.Err() +} + +// Remove candidate objects selected by ACTION policy +func (o *Object) Remove(ctx context.Context) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + errs := Errors(make([]error, len(entries))) + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.Remove(ctx) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + return errs.Err() +} + +// SetModTime sets the metadata on the object to set the modification date +func (o *Object) SetModTime(ctx context.Context, t time.Time) error { + f := o.Fs().(*Fs) + entries, err := f.actionEntries(o.candidates()...) + if err != nil { + return err + } + var wg sync.WaitGroup + errs := Errors(make([]error, len(entries))) + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.SetModTime(ctx, t) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + wg.Wait() + return errs.Err() +} + +// ModTime returns the modification date of the directory +// It returns the latest ModTime of all candidates +func (d *Directory) ModTime(ctx context.Context) (t time.Time) { + entries := d.candidates() + times := make([]time.Time, len(entries)) + multithread(len(entries), func(i int) { + times[i] = entries[i].ModTime(ctx) + }) + for _, ti := range times { + if t.Before(ti) { + t = ti + } + } + return t +} + +// Size returns the size of the directory +// It returns the sum of all candidates +func (d *Directory) Size() (s int64) { + for _, e := range d.candidates() { + s += e.Size() + } + return s +} diff --git a/backend/union/upstream/upstream.go b/backend/union/upstream/upstream.go index 39581c3e7..e6f73a09c 100644 --- a/backend/union/upstream/upstream.go +++ b/backend/union/upstream/upstream.go @@ -292,7 +292,9 @@ func (f *Fs) updateUsage() error { return f.updateUsageCore(false) } if atomic.CompareAndSwapUint32(&f.cacheState, normal, updating) { - go f.updateUsageCore(true) + go func() { + _ = f.updateUsageCore(true) + }() } return nil }