diff --git a/backend/union/union.go b/backend/union/union.go index 967916ea5..88e8cd715 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -153,7 +153,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return err } - // Get mutliple reader + if len(entries) == 1 { + obj := entries[0].(*upstream.Object) + return obj.Update(ctx, in, src, options...) + } + // Get multiple reader readers := make([]io.Reader, len(entries)) writers := make([]io.Writer, len(entries)) errs := make([]error, len(entries) + 1) @@ -161,9 +165,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op r, w := io.Pipe() bw := bufio.NewWriter(w) readers[i], writers[i] = r, bw - defer func () { - w.Close() - }() + defer w.Close() } go func() { mw := io.MultiWriter(writers...) @@ -515,8 +517,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch // as an optional interface func (f *Fs) DirCacheFlush() { multithread(len(f.upstreams), func(i int){ - do := f.upstreams[i].Features().DirCacheFlush; - if do != nil { + if do := f.upstreams[i].Features().DirCacheFlush; do != nil { do() } }) @@ -524,75 +525,63 @@ func (f *Fs) DirCacheFlush() { func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) { srcPath := src.Remote() - u, err := f.search(ctx, srcPath) - upstreams := []*upstream.Fs{u} - if err != nil || len(upstreams) == 0 { - upstreams, err = f.create(ctx, parentDir(srcPath)) + upstreams, err := f.create(ctx, parentDir(srcPath)) + if err != nil { + return nil, err + } + if len(upstreams) == 1 { + u := upstreams[0] + var o fs.Object + var err error + if stream { + o, err = u.Features().PutStream(ctx, in, src, options...) + } else { + o, err = u.Put(ctx, in, src, options...) + } if err != nil { return nil, err } + e, err := f.wrapEntries(u.WrapObject(o)) + return e.(*Object), err } - // Get mutliple reader - readers := make([]*io.PipeReader, len(upstreams)) - writers := make([]*io.PipeWriter, len(upstreams)) - errs := make([]error, len(upstreams) + 1) - for i := range upstreams { + // Get multiple reader + readers := make([]io.Reader, len(upstreams)) + writers := make([]io.Writer, len(upstreams)) + for i := range writers { r, w := io.Pipe() - readers[i], writers[i] = r, w + bw := bufio.NewWriter(w) + readers[i], writers[i] = r, bw + defer w.Close() } go func() { - bufw := make([]io.Writer, len(writers)) - for i, w := range writers { - bufw[i] = bufio.NewWriter(w) - } - mw := io.MultiWriter(bufw...) - _, err := io.Copy(mw, in) - if err != nil { - errs[len(upstreams)] = err - } - for _, bw := range bufw { + mw := io.MultiWriter(writers...) + io.Copy(mw, in) + for _, bw := range writers { bw.(*bufio.Writer).Flush() } }() // Multi-threading - var wg sync.WaitGroup - objs := make([]*upstream.Object, len(upstreams)) - for i, u := range upstreams { - wg.Add(1) - i, u := i, u // Closure - go func() { - defer wg.Done() - var o fs.Object - var err error - if stream { - o, err = u.Features().PutStream(ctx, readers[i], src, options...) - } else { - o, err = u.Put(ctx, readers[i], src, options...) - } - if err != nil { - errs[i] = err - return - } - objs[i] = u.WrapObject(o) - }() - } - wg.Wait() - for _, w := range writers { - w.Close() - } - var entries []upstream.Entry - for i, o := range objs { - if errs[i] != nil { - err = errs[i] - continue + objs := make([]upstream.Entry, len(upstreams)) + errs := make([]error, len(upstreams)) + multithread(len(upstreams), func(i int) { + u := upstreams[i] + var o fs.Object + if stream { + o, errs[i] = u.Features().PutStream(ctx, readers[i], src, options...) + } else { + o, errs[i] = u.Put(ctx, readers[i], src, options...) + } + if errs[i] != nil { + return + } + objs[i] = u.WrapObject(o) + }) + for _, err := range errs { + if err != nil { + return nil, err } - entries = append(entries, o) } - if len(entries) == 0 { - return nil, err - } - // Get an object for future operation - e, err := f.wrapEntries(entries...) + e, err := f.wrapEntries(objs...) return e.(*Object), err } @@ -602,7 +591,15 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo // will return the object and the error, otherwise will return // nil and the error func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return f.put(ctx, in, src, false, options...) + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, false, options...) + default: + return nil, err + } } // PutStream uploads to the remote path with the modTime given of indeterminate size @@ -611,7 +608,15 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // will return the object and the error, otherwise will return // nil and the error func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return f.put(ctx, in, src, true, options...) + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, true, options...) + default: + return nil, err + } } // About gets quota information from the Fs