diff --git a/backend/local/local.go b/backend/local/local.go index 314fcdea0..1e9eae482 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -1140,6 +1140,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } hashType := o.fs.Hashes().GetOne() + if resumeOpt.Hash != "" { + if err = hashType.Set(resumeOpt.Hash); err != nil { + return err + } + if !o.fs.Hashes().Contains(hashType) { + return fmt.Errorf("unsupported resume hash: %q", resumeOpt.Hash) + } + } hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType)) if err != nil { return err @@ -1207,10 +1215,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Context for read so that we can handle io.copy being interrupted ctxr, cancel := context.WithCancel(ctx) // Create exit handler during Copy so that resume data can be written if interrupted + var atexitOnce sync.Once atexitHandle := atexit.Register(func() { - // If OptionResume was passed, call SetID to prepare for future resumes - // ID is the number of bytes written to the destination - if resumeOpt != nil { + atexitOnce.Do(func() { + if resumeOpt == nil || hasher == nil { + return + } + // If OptionResume was passed, call SetID to prepare for future resumes + // ID is the number of bytes written to the destination // Stops the copy so cache is consistent with remote cacheingWg.Add(1) cancel() @@ -1218,13 +1230,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op fs.Infof(o, "Updating resume cache") fileInfo, _ := o.fs.lstat(o.path) writtenStr := strconv.FormatInt(fileInfo.Size(), 10) - hashType := o.fs.Hashes().GetOne() + hashType := hasher.Hashes().GetOne() hashState, err := hasher.GetHashState(hashType) - if err != nil { - return + if err == nil { + err = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState) } - _ = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState) - } + if err != nil { + fs.Logf(o, "Updating resume cache failed: %v", err) + } + }) }) cr := readers.NewContextReader(ctxr, in) _, err = io.Copy(out, cr) diff --git a/fs/hash/hash.go b/fs/hash/hash.go index e87ac1929..9ef3c6eeb 100644 --- a/fs/hash/hash.go +++ b/fs/hash/hash.go @@ -229,6 +229,14 @@ func (m *MultiHasher) Write(p []byte) (n int, err error) { return n, err } +// Hashes returns accumulated hash types. +func (m *MultiHasher) Hashes() (set Set) { + for ht := range m.h { + set.Add(ht) + } + return +} + // Sums returns the sums of all accumulated hashes as hex encoded // strings. func (m *MultiHasher) Sums() map[Type]string { diff --git a/fs/open_options.go b/fs/open_options.go index da7762353..29e43bb97 100644 --- a/fs/open_options.go +++ b/fs/open_options.go @@ -240,6 +240,7 @@ func (o *HashesOption) Mandatory() bool { type OptionResume struct { ID string // resume this ID if set Pos int64 // and resume from this position + Hash string Src Object F Fs Remote string diff --git a/fs/operations/resume.go b/fs/operations/resume.go index 7c0eb7e28..49bb99571 100644 --- a/fs/operations/resume.go +++ b/fs/operations/resume.go @@ -31,6 +31,7 @@ func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) fs.Errorf(src, "Resume canceled: %v", resumeErr) } else if position > int64(ci.ResumeCutoff) { resumeOpt.Pos = position + resumeOpt.Hash = hashName } } }