Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Nick Craig-Wood
1bdeabfa46 hdfs: retry the create too FIXME 2024-06-18 14:36:40 +01:00
Nick Craig-Wood
656ef8952f hdfs: retry the io.Copy to see if that helps FIXME DO NOT MERGE 2024-06-15 17:34:49 +01:00
Nick Craig-Wood
010ed973df hdfs: add debugging to try to debug #7827 FIXME DO NOT MERGE 2024-06-15 09:49:45 +01:00

View file

@ -48,6 +48,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
realpath := o.fs.realpath(o.Remote())
err := o.fs.client.Chtimes(realpath, modTime, modTime)
if err != nil {
fs.Errorf(o, "SetModTime: ChTimes(%q, %v, %v) returned error: %v", realpath, modTime, modTime, err)
return err
}
o.modTime = modTime
@ -113,31 +114,49 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
err := o.fs.client.MkdirAll(dirname, 0755)
if err != nil {
fs.Errorf(o, "update: MkdirAll(%q, 0755) returned error: %v", dirname, err)
return err
}
_, err = o.fs.client.Stat(realpath)
if err == nil {
fs.Errorf(o, "update: Stat(%q) returned error: %v", realpath, err)
err = o.fs.client.Remove(realpath)
if err != nil {
fs.Errorf(o, "update: Remove(%q) returned error: %v", realpath, err)
return err
}
}
out, err := o.fs.client.Create(realpath)
if err != nil {
return err
}
cleanup := func() {
rerr := o.fs.client.Remove(realpath)
if rerr != nil {
fs.Errorf(o, "update: cleanup: Remove(%q) returned error: %v", realpath, err)
fs.Errorf(o.fs, "failed to remove [%v]: %v", realpath, rerr)
}
}
_, err = io.Copy(out, in)
var out *hdfs.FileWriter
err = o.fs.pacer.Call(func() (bool, error) {
if out != nil {
_ = out.Close()
out = nil
}
out, err = o.fs.client.Create(realpath)
if err != nil {
fs.Errorf(o, "update: Create(%q) returned error: %v", realpath, err)
return false, err
}
_, err = io.Copy(out, in)
if err == nil {
return false, nil
}
return errors.Is(err, io.ErrUnexpectedEOF), err
})
if err != nil {
fs.Errorf(o, "update: io.Copy returned error: %v", err)
cleanup()
return err
}
@ -160,21 +179,25 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return errors.Is(err, hdfs.ErrReplicating), err
})
if err != nil {
fs.Errorf(o, "update: Close(%#v) returned error: %v", out, err)
cleanup()
return err
}
info, err := o.fs.client.Stat(realpath)
if err != nil {
fs.Errorf(o, "update: Stat#2(%q) returned error: %v", realpath, err)
return err
}
err = o.SetModTime(ctx, src.ModTime(ctx))
if err != nil {
fs.Errorf(o, "update: SetModTime(%v) returned error: %v", src.ModTime(ctx), err)
return err
}
o.size = info.Size()
fs.Errorf(o, "update: returned no error")
return nil
}