diff --git a/backend/union/entry.go b/backend/union/entry.go index ef0953b6b..2cd16c5f0 100644 --- a/backend/union/entry.go +++ b/backend/union/entry.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/pkg/errors" "github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/fs" ) @@ -70,7 +71,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Get multiple reader readers := make([]io.Reader, len(entries)) writers := make([]io.Writer, len(entries)) - errs := make([]error, len(entries)+1) + errs := Errors(make([]error, len(entries)+1)) for i := range entries { r, w := io.Pipe() bw := bufio.NewWriter(w) @@ -79,26 +80,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } go func() { mw := io.MultiWriter(writers...) - _, errs[len(entries)] = io.Copy(mw, in) - for _, bw := range writers { - bw.(*bufio.Writer).Flush() + es := make([]error, len(writers)+1) + _, es[len(es)-1] = io.Copy(mw, in) + for i, bw := range writers { + es[i] = bw.(*bufio.Writer).Flush() } + errs[len(entries)] = Errors(es).Err() }() // Multi-threading multithread(len(entries), func(i int) { if o, ok := entries[i].(*upstream.Object); ok { - errs[i] = o.Update(ctx, readers[i], src, options...) + err := o.Update(ctx, readers[i], src, options...) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) } else { errs[i] = fs.ErrorNotAFile } }) - // Get an object for future operation - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // Remove candidate objects selected by ACTION policy @@ -108,20 +106,16 @@ func (o *Object) Remove(ctx context.Context) error { if err != nil { return err } - errs := make([]error, len(entries)) + errs := Errors(make([]error, len(entries))) multithread(len(entries), func(i int) { if o, ok := entries[i].(*upstream.Object); ok { - errs[i] = o.Remove(ctx) + err := o.Remove(ctx) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) } else { errs[i] = fs.ErrorNotAFile } }) - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // SetModTime sets the metadata on the object to set the modification date @@ -132,21 +126,17 @@ func (o *Object) SetModTime(ctx context.Context, t time.Time) error { return err } var wg sync.WaitGroup - errs := make([]error, len(entries)) + errs := Errors(make([]error, len(entries))) multithread(len(entries), func(i int) { if o, ok := entries[i].(*upstream.Object); ok { - errs[i] = o.SetModTime(ctx, t) + err := o.SetModTime(ctx, t) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) } else { errs[i] = fs.ErrorNotAFile } }) wg.Wait() - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // ModTime returns the modification date of the directory diff --git a/backend/union/errors.go b/backend/union/errors.go new file mode 100644 index 000000000..d829d509e --- /dev/null +++ b/backend/union/errors.go @@ -0,0 +1,71 @@ +package union + +import ( + "bytes" + "fmt" +) + +// The Errors type wraps a slice of errors +type Errors []error + +var ( + // FilterNil returns the error directly + FilterNil = func(err error) error { + return err + } +) + +// Map returns a copy of the error slice with all its errors modified +// according to the mapping function. If mapping returns nil, +// the error is dropped from the error slice with no replacement. +func (e Errors) Map(mapping func(error) error) Errors { + s := make([]error, len(e)) + i := 0 + for _, err := range e { + nerr := mapping(err) + if nerr == nil { + continue + } + s[i] = nerr + i++ + } + return Errors(s[:i]) +} + +// Err returns a MultiError struct containing this Errors instance, or nil +// if there are zero errors contained. +func (e Errors) Err() error { + e = e.Map(FilterNil) + if len(e) == 0 { + return nil + } + + return &MultiError{Errors: e} +} + +// MultiError type implements the error interface, and contains the +// Errors used to construct it. +type MultiError struct { + Errors Errors +} + +// Error returns a concatenated string of the contained errors +func (m *MultiError) Error() string { + var buf bytes.Buffer + + if len(m.Errors) == 1 { + buf.WriteString("1 error: ") + } else { + fmt.Fprintf(&buf, "%d errors: ", len(m.Errors)) + } + + for i, err := range m.Errors { + if i != 0 { + buf.WriteString("; ") + } + + buf.WriteString(err.Error()) + } + + return buf.String() +} diff --git a/backend/union/union.go b/backend/union/union.go index 85177ae30..10d74bb9e 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -126,16 +126,12 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { if err != nil { return err } - errs := make([]error, len(upstreams)) + errs := Errors(make([]error, len(upstreams))) multithread(len(upstreams), func(i int) { - errs[i] = upstreams[i].Rmdir(ctx, dir) + err := upstreams[i].Rmdir(ctx, dir) + errs[i] = errors.Wrap(err, upstreams[i].Name()) }) - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // Hashes returns hash.HashNone to indicate remote hashing is unavailable @@ -150,16 +146,12 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error { if err != nil { return err } - errs := make([]error, len(upstreams)) + errs := Errors(make([]error, len(upstreams))) multithread(len(upstreams), func(i int) { - errs[i] = upstreams[i].Mkdir(ctx, dir) + err := upstreams[i].Mkdir(ctx, dir) + errs[i] = errors.Wrap(err, upstreams[i].Name()) }) - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // Purge all files in the root and the root directory @@ -178,16 +170,12 @@ func (f *Fs) Purge(ctx context.Context) error { if err != nil { return err } - errs := make([]error, len(upstreams)) + errs := Errors(make([]error, len(upstreams))) multithread(len(upstreams), func(i int) { - errs[i] = upstreams[i].Features().Purge(ctx) + err := upstreams[i].Features().Purge(ctx) + errs[i] = errors.Wrap(err, upstreams[i].Name()) }) - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // Copy src to this remote using server side copy operations. @@ -247,17 +235,17 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, } } objs := make([]*upstream.Object, len(entries)) - errs := make([]error, len(entries)) + errs := Errors(make([]error, len(entries))) multithread(len(entries), func(i int) { u := entries[i].UpstreamFs() o, ok := entries[i].(*upstream.Object) if !ok { - errs[i] = fs.ErrorNotAFile + errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name()) return } mo, err := u.Features().Move(ctx, o, remote) if err != nil || mo == nil { - errs[i] = err + errs[i] = errors.Wrap(err, u.Name()) return } objs[i] = u.WrapObject(mo) @@ -272,12 +260,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, if err != nil { return nil, err } - for _, err := range errs { - if err != nil { - return e.(*Object), err - } - } - return e.(*Object), nil + return e.(*Object), errs.Err() } // DirMove moves src, srcRemote to this remote at dstRemote @@ -303,17 +286,13 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string return fs.ErrorCantDirMove } } - errs := make([]error, len(upstreams)) + errs := Errors(make([]error, len(upstreams))) multithread(len(upstreams), func(i int) { u := upstreams[i] - errs[i] = u.Features().DirMove(ctx, u, srcRemote, dstRemote) + err := u.Features().DirMove(ctx, u, srcRemote, dstRemote) + errs[i] = errors.Wrap(err, u.Name()) }) - for _, err := range errs { - if err != nil { - return err - } - } - return nil + return errs.Err() } // ChangeNotify calls the passed function with a path @@ -379,6 +358,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo e, err := f.wrapEntries(u.WrapObject(o)) return e.(*Object), err } + errs := Errors(make([]error, len(upstreams)+1)) // Get multiple reader readers := make([]io.Reader, len(upstreams)) writers := make([]io.Writer, len(upstreams)) @@ -386,35 +366,42 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo r, w := io.Pipe() bw := bufio.NewWriter(w) readers[i], writers[i] = r, bw - defer w.Close() + defer func() { + err := w.Close() + if err != nil { + panic(err) + } + }() } go func() { mw := io.MultiWriter(writers...) - io.Copy(mw, in) - for _, bw := range writers { - bw.(*bufio.Writer).Flush() + es := make([]error, len(writers)+1) + _, es[len(es)-1] = io.Copy(mw, in) + for i, bw := range writers { + es[i] = bw.(*bufio.Writer).Flush() } + errs[len(upstreams)] = Errors(es).Err() }() // Multi-threading objs := make([]upstream.Entry, len(upstreams)) - errs := make([]error, len(upstreams)) multithread(len(upstreams), func(i int) { u := upstreams[i] var o fs.Object + var err error if stream { - o, errs[i] = u.Features().PutStream(ctx, readers[i], src, options...) + o, err = u.Features().PutStream(ctx, readers[i], src, options...) } else { - o, errs[i] = u.Put(ctx, readers[i], src, options...) + o, err = u.Put(ctx, readers[i], src, options...) } - if errs[i] != nil { + if err != nil { + errs[i] = errors.Wrap(err, u.Name()) return } objs[i] = u.WrapObject(o) }) - for _, err := range errs { - if err != nil { - return nil, err - } + err = errs.Err() + if err != nil { + return nil, err } e, err := f.wrapEntries(objs...) return e.(*Object), err @@ -514,12 +501,12 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { entriess := make([][]upstream.Entry, len(f.upstreams)) - errs := make([]error, len(f.upstreams)) + errs := Errors(make([]error, len(f.upstreams))) multithread(len(f.upstreams), func(i int) { u := f.upstreams[i] entries, err := u.List(ctx, dir) if err != nil { - errs[i] = err + errs[i] = errors.Wrap(err, u.Name()) return } uEntries := make([]upstream.Entry, len(entries)) @@ -528,18 +515,17 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } entriess[i] = uEntries }) - found := false - for _, err := range errs { - if err == fs.ErrorDirNotFound { - continue + if len(errs) == len(errs.Map(FilterNil)) { + errs = errs.Map(func(e error) error { + if errors.Cause(e) == fs.ErrorDirNotFound { + return nil + } + return e + }) + if len(errs) == 0 { + return nil, fs.ErrorDirNotFound } - if err != nil { - return nil, err - } - found = true - } - if !found { - return nil, fs.ErrorDirNotFound + return nil, errs.Err() } return f.mergeDirEntries(entriess) } @@ -547,12 +533,12 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // NewObject creates a new remote union file object func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { objs := make([]*upstream.Object, len(f.upstreams)) - errs := make([]error, len(f.upstreams)) + errs := Errors(make([]error, len(f.upstreams))) multithread(len(f.upstreams), func(i int) { u := f.upstreams[i] o, err := u.NewObject(ctx, remote) if err != nil && err != fs.ErrorObjectNotFound { - errs[i] = err + errs[i] = errors.Wrap(err, u.Name()) return } objs[i] = u.WrapObject(o) @@ -570,12 +556,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { if err != nil { return nil, err } - for _, err := range errs { - if err != nil { - return e.(*Object), err - } - } - return e.(*Object), nil + return e.(*Object), errs.Err() } // Precision is the greatest Precision of all upstreams @@ -661,7 +642,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } upstreams := make([]*upstream.Fs, len(opt.Upstreams)) - errs := make([]error, len(opt.Upstreams)) + errs := Errors(make([]error, len(opt.Upstreams))) multithread(len(opt.Upstreams), func(i int) { u := opt.Upstreams[i] upstreams[i], errs[i] = upstream.New(u, root, time.Duration(opt.CacheTime)*time.Second)