local: improve atexit handler for resume

- report errors if any
- prevent double invocation
- prefer saved hash type when many are supported
This commit is contained in:
Ivan Andreev 2021-10-13 20:20:04 +03:00
parent b015012d8b
commit e65e046c21
4 changed files with 32 additions and 8 deletions

View file

@ -1140,6 +1140,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
hashType := o.fs.Hashes().GetOne()
if resumeOpt.Hash != "" {
if err = hashType.Set(resumeOpt.Hash); err != nil {
return err
}
if !o.fs.Hashes().Contains(hashType) {
return fmt.Errorf("unsupported resume hash: %q", resumeOpt.Hash)
}
}
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType))
if err != nil {
return err
@ -1207,10 +1215,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Context for read so that we can handle io.copy being interrupted
ctxr, cancel := context.WithCancel(ctx)
// Create exit handler during Copy so that resume data can be written if interrupted
var atexitOnce sync.Once
atexitHandle := atexit.Register(func() {
// If OptionResume was passed, call SetID to prepare for future resumes
// ID is the number of bytes written to the destination
if resumeOpt != nil {
atexitOnce.Do(func() {
if resumeOpt == nil || hasher == nil {
return
}
// If OptionResume was passed, call SetID to prepare for future resumes
// ID is the number of bytes written to the destination
// Stops the copy so cache is consistent with remote
cacheingWg.Add(1)
cancel()
@ -1218,13 +1230,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Infof(o, "Updating resume cache")
fileInfo, _ := o.fs.lstat(o.path)
writtenStr := strconv.FormatInt(fileInfo.Size(), 10)
hashType := o.fs.Hashes().GetOne()
hashType := hasher.Hashes().GetOne()
hashState, err := hasher.GetHashState(hashType)
if err != nil {
return
if err == nil {
err = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
}
_ = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
}
if err != nil {
fs.Logf(o, "Updating resume cache failed: %v", err)
}
})
})
cr := readers.NewContextReader(ctxr, in)
_, err = io.Copy(out, cr)

View file

@ -229,6 +229,14 @@ func (m *MultiHasher) Write(p []byte) (n int, err error) {
return n, err
}
// Hashes returns accumulated hash types.
func (m *MultiHasher) Hashes() (set Set) {
for ht := range m.h {
set.Add(ht)
}
return
}
// Sums returns the sums of all accumulated hashes as hex encoded
// strings.
func (m *MultiHasher) Sums() map[Type]string {

View file

@ -240,6 +240,7 @@ func (o *HashesOption) Mandatory() bool {
type OptionResume struct {
ID string // resume this ID if set
Pos int64 // and resume from this position
Hash string
Src Object
F Fs
Remote string

View file

@ -31,6 +31,7 @@ func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object)
fs.Errorf(src, "Resume canceled: %v", resumeErr)
} else if position > int64(ci.ResumeCutoff) {
resumeOpt.Pos = position
resumeOpt.Hash = hashName
}
}
}