Rework retry logic when copying objects

* Fix off by one retry logic - fixes #406
  * Retry any retriable errors
  * Restructure code
This commit is contained in:
Nick Craig-Wood 2016-06-18 10:55:58 +01:00
parent cc6a776034
commit 1b4370bde1

View file

@ -202,59 +202,64 @@ func Copy(f Fs, dst, src Object) {
maxTries := Config.LowLevelRetries maxTries := Config.LowLevelRetries
tries := 0 tries := 0
doUpdate := dst != nil doUpdate := dst != nil
var err, inErr error var err error
tryAgain: var actionTaken string
// Try server side copy first - if has optional interface and for {
// is same underlying remote // Try server side copy first - if has optional interface and
actionTaken := "Copied (server side copy)" // is same underlying remote
if fCopy, ok := f.(Copier); ok && src.Fs().Name() == f.Name() { actionTaken = "Copied (server side copy)"
var newDst Object if fCopy, ok := f.(Copier); ok && src.Fs().Name() == f.Name() {
newDst, err = fCopy.Copy(src, src.Remote()) var newDst Object
if err == nil { newDst, err = fCopy.Copy(src, src.Remote())
dst = newDst if err == nil {
} dst = newDst
} else { }
err = ErrorCantCopy
}
// If can't server side copy, do it manually
if err == ErrorCantCopy {
var in0 io.ReadCloser
in0, err = src.Open()
if err != nil {
Stats.Error()
ErrorLog(src, "Failed to open: %v", err)
return
}
// On big files add a buffer
if src.Size() > 10<<20 {
in0, _ = newAsyncReader(in0, 4, 4<<20)
}
in := NewAccount(in0, src) // account the transfer
if doUpdate {
actionTaken = "Copied (updated existing)"
err = dst.Update(in, src)
} else { } else {
actionTaken = "Copied (new)" err = ErrorCantCopy
dst, err = f.Put(in, src) }
// If can't server side copy, do it manually
if err == ErrorCantCopy {
var in0 io.ReadCloser
in0, err = src.Open()
if err != nil {
err = errors.Wrap(err, "failed to open source object")
} else {
// On big files add a buffer
if src.Size() > 10<<20 {
in0, _ = newAsyncReader(in0, 4, 4<<20)
}
in := NewAccount(in0, src) // account the transfer
if doUpdate {
actionTaken = "Copied (updated existing)"
err = dst.Update(in, src)
} else {
actionTaken = "Copied (new)"
dst, err = f.Put(in, src)
}
closeErr := in.Close()
if err == nil {
err = closeErr
}
}
} }
inErr = in.Close()
}
// Retry if err returned a retry error
if IsRetryError(err) && tries < maxTries {
tries++ tries++
Log(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries) if tries >= maxTries {
if removeFailedCopy(dst) { break
// If we removed dst, then nil it out and note we are not updating
dst = nil
doUpdate = false
} }
goto tryAgain // Retry if err returned a retry error
} if IsRetryError(err) || ShouldRetry(err) {
if err == nil { Log(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries)
err = inErr if removeFailedCopy(dst) {
// If we removed dst, then nil it out and note we are not updating
dst = nil
doUpdate = false
}
continue
}
// otherwise finish
break
} }
if err != nil { if err != nil {
Stats.Error() Stats.Error()