rclone/backend/union/entry.go
Nick Craig-Wood 5dd51e6149 union: fix deadlock when one part of a multi-upload fails
Before this fix, rclone would deadlock when uploading two files at
once, if one errored. This caused the other file to block in the multi
reader and never complete.

This fix drains the input buffer on error which allows the other
upload to complete.

See: https://forum.rclone.org/t/union-with-create-policy-all-copy-stuck-when-first-union-fails/29601
2022-03-09 11:30:55 +00:00

166 lines
4.1 KiB
Go

package union
import (
"context"
"fmt"
"io"
"io/ioutil"
"sync"
"time"
"github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs"
)
// Object describes a union Object
//
// This is a wrapped object which returns the Union Fs as its parent
type Object struct {
*upstream.Object
fs *Fs // what this object is part of
co []upstream.Entry
}
// Directory describes a union Directory
//
// This is a wrapped object contains all candidates
type Directory struct {
*upstream.Directory
cd []upstream.Entry
}
type entry interface {
upstream.Entry
candidates() []upstream.Entry
}
// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() *upstream.Object {
return o.Object
}
// Fs returns the union Fs as the parent
func (o *Object) Fs() fs.Info {
return o.fs
}
func (o *Object) candidates() []upstream.Entry {
return o.co
}
func (d *Directory) candidates() []upstream.Entry {
return d.cd
}
// Update in to the object with the modTime given of the given size
//
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
// return an error or update the object properly (rather than e.g. calling panic).
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
entries, err := o.fs.actionEntries(o.candidates()...)
if err == fs.ErrorPermissionDenied {
// There are no candidates in this object which can be written to
// So attempt to create a new object instead
newO, err := o.fs.put(ctx, in, src, false, options...)
if err != nil {
return err
}
// Update current object
*o = *newO.(*Object)
return nil
} else if err != nil {
return err
}
if len(entries) == 1 {
obj := entries[0].(*upstream.Object)
return obj.Update(ctx, in, src, options...)
}
// Multi-threading
readers, errChan := multiReader(len(entries), in)
errs := Errors(make([]error, len(entries)+1))
multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok {
err := o.Update(ctx, readers[i], src, options...)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
if len(entries) > 1 {
// Drain the input buffer to allow other uploads to continue
_, _ = io.Copy(ioutil.Discard, readers[i])
}
}
} else {
errs[i] = fs.ErrorNotAFile
}
})
errs[len(entries)] = <-errChan
return errs.Err()
}
// Remove candidate objects selected by ACTION policy
func (o *Object) Remove(ctx context.Context) error {
entries, err := o.fs.actionEntries(o.candidates()...)
if err != nil {
return err
}
errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok {
err := o.Remove(ctx)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
}
} else {
errs[i] = fs.ErrorNotAFile
}
})
return errs.Err()
}
// SetModTime sets the metadata on the object to set the modification date
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
entries, err := o.fs.actionEntries(o.candidates()...)
if err != nil {
return err
}
var wg sync.WaitGroup
errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok {
err := o.SetModTime(ctx, t)
if err != nil {
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
}
} else {
errs[i] = fs.ErrorNotAFile
}
})
wg.Wait()
return errs.Err()
}
// ModTime returns the modification date of the directory
// It returns the latest ModTime of all candidates
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
entries := d.candidates()
times := make([]time.Time, len(entries))
multithread(len(entries), func(i int) {
times[i] = entries[i].ModTime(ctx)
})
for _, ti := range times {
if t.Before(ti) {
t = ti
}
}
return t
}
// Size returns the size of the directory
// It returns the sum of all candidates
func (d *Directory) Size() (s int64) {
for _, e := range d.candidates() {
s += e.Size()
}
return s
}