fs: add UploadHeaders, DownloadHeaders to Update/Put/Open options

This commit is contained in:
Tim Gallant 2020-02-10 01:01:28 -08:00 committed by Nick Craig-Wood
parent 93caa459e3
commit 9bf3d3da4c
4 changed files with 52 additions and 31 deletions

View file

@ -71,7 +71,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
rc, err := NewReOpen(ctx, mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries) rc, err := NewReOpen(ctx, mc.src, fs.Config.LowLevelRetries, &fs.RangeOption{Start: start, End: end - 1})
if err != nil { if err != nil {
return errors.Wrap(err, "multpart copy: failed to open source") return errors.Wrap(err, "multpart copy: failed to open source")
} }

View file

@ -403,7 +403,11 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
} }
} else { } else {
var in0 io.ReadCloser var in0 io.ReadCloser
in0, err = NewReOpen(ctx, src, hashOption, nil, fs.Config.LowLevelRetries) options := []fs.OpenOption{hashOption}
for _, option := range fs.Config.DownloadHeaders {
options = append(options, option)
}
in0, err = NewReOpen(ctx, src, fs.Config.LowLevelRetries, options...)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to open source object") err = errors.Wrap(err, "failed to open source object")
} else { } else {
@ -424,12 +428,16 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if src.Remote() != remote { if src.Remote() != remote {
wrappedSrc = NewOverrideRemote(src, remote) wrappedSrc = NewOverrideRemote(src, remote)
} }
options := []fs.OpenOption{hashOption}
for _, option := range fs.Config.UploadHeaders {
options = append(options, option)
}
if doUpdate { if doUpdate {
actionTaken = "Copied (replaced existing)" actionTaken = "Copied (replaced existing)"
err = dst.Update(ctx, in, wrappedSrc, hashOption) err = dst.Update(ctx, in, wrappedSrc, options...)
} else { } else {
actionTaken = "Copied (new)" actionTaken = "Copied (new)"
dst, err = f.Put(ctx, in, wrappedSrc, hashOption) dst, err = f.Put(ctx, in, wrappedSrc, options...)
} }
closeErr := in.Close() closeErr := in.Close()
if err == nil { if err == nil {

View file

@ -12,17 +12,16 @@ import (
// ReOpen is a wrapper for an object reader which reopens the stream on error // ReOpen is a wrapper for an object reader which reopens the stream on error
type ReOpen struct { type ReOpen struct {
ctx context.Context ctx context.Context
mu sync.Mutex // mutex to protect the below mu sync.Mutex // mutex to protect the below
src fs.Object // object to open src fs.Object // object to open
hashOption *fs.HashesOption // option to pass to initial open options []fs.OpenOption // option to pass to initial open
rangeOption *fs.RangeOption // option to pass to initial open rc io.ReadCloser // underlying stream
rc io.ReadCloser // underlying stream read int64 // number of bytes read from this stream
read int64 // number of bytes read from this stream maxTries int // maximum number of retries
maxTries int // maximum number of retries tries int // number of retries we've had so far in this stream
tries int // number of retries we've had so far in this stream err error // if this is set then Read/Close calls will return it
err error // if this is set then Read/Close calls will return it opened bool // if set then rc is valid and needs closing
opened bool // if set then rc is valid and needs closing
} }
var ( var (
@ -36,13 +35,12 @@ var (
// //
// If rangeOption is set then this will applied when reading from the // If rangeOption is set then this will applied when reading from the
// start, and updated on retries. // start, and updated on retries.
func NewReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) { func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
h := &ReOpen{ h := &ReOpen{
ctx: ctx, ctx: ctx,
src: src, src: src,
hashOption: hashOption, maxTries: maxTries,
rangeOption: rangeOption, options: options,
maxTries: maxTries,
} }
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
@ -57,20 +55,35 @@ func NewReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption,
// //
// we don't retry here as the Open() call will itself have low level retries // we don't retry here as the Open() call will itself have low level retries
func (h *ReOpen) open() error { func (h *ReOpen) open() error {
var optsArray [2]fs.OpenOption opts := []fs.OpenOption{}
var opts = optsArray[:0] var hashOption *fs.HashesOption
if h.read == 0 { var rangeOption *fs.RangeOption
if h.rangeOption != nil { for _, option := range h.options {
opts = append(opts, h.rangeOption) switch option.(type) {
case *fs.HashesOption:
hashOption = option.(*fs.HashesOption)
case *fs.RangeOption:
rangeOption = option.(*fs.RangeOption)
case *fs.HTTPOption:
opts = append(opts, option)
default:
if option.Mandatory() {
fs.Logf(h.src, "Unsupported mandatory option: %v", option)
}
} }
if h.hashOption != nil { }
if h.read == 0 {
if rangeOption != nil {
opts = append(opts, rangeOption)
}
if hashOption != nil {
// put hashOption on if reading from the start, ditch otherwise // put hashOption on if reading from the start, ditch otherwise
opts = append(opts, h.hashOption) opts = append(opts, hashOption)
} }
} else { } else {
if h.rangeOption != nil { if rangeOption != nil {
// range to the read point // range to the read point
opts = append(opts, &fs.RangeOption{Start: h.rangeOption.Start + h.read, End: h.rangeOption.End}) opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
} else { } else {
// seek to the read point // seek to the read point
opts = append(opts, &fs.SeekOption{Offset: h.read}) opts = append(opts, &fs.SeekOption{Offset: h.read})

View file

@ -74,7 +74,7 @@ func TestReOpen(t *testing.T) {
breaks: breaks, breaks: breaks,
} }
hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}
return NewReOpen(context.Background(), src, hashOption, rangeOption, maxRetries) return NewReOpen(context.Background(), src, maxRetries, hashOption, rangeOption)
} }
t.Run("Basics", func(t *testing.T) { t.Run("Basics", func(t *testing.T) {