From 7453b7d5f3db8cc3d4feb17e81efe23b0b8eb751 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 8 Sep 2023 15:30:04 +0100 Subject: [PATCH] hdfs: fix retry "replication in progress" errors when uploading Before this change uploaded files could return the error "replication in progress". This error is harmless though and means the Close should be retried which is what this patch does. --- backend/hdfs/fs.go | 9 +++++++++ backend/hdfs/object.go | 20 +++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/backend/hdfs/fs.go b/backend/hdfs/fs.go index 7d48ffed6..8ba9d2366 100644 --- a/backend/hdfs/fs.go +++ b/backend/hdfs/fs.go @@ -21,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/pacer" ) // Fs represents a HDFS server @@ -31,8 +32,15 @@ type Fs struct { opt Options // options for this backend ci *fs.ConfigInfo // global config client *hdfs.Client + pacer *fs.Pacer // pacer for API calls } +const ( + minSleep = 20 * time.Millisecond + maxSleep = 10 * time.Second + decayConstant = 2 // bigger for slower decay, exponential +) + // copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go func getKerberosClient() (*krb.Client, error) { configPath := os.Getenv("KRB5_CONFIG") @@ -114,6 +122,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt: *opt, ci: fs.GetConfig(ctx), client: client, + pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ diff --git a/backend/hdfs/object.go b/backend/hdfs/object.go index 31c8282f8..a2c2c1087 100644 --- a/backend/hdfs/object.go +++ b/backend/hdfs/object.go @@ -5,10 +5,12 @@ package hdfs import ( "context" + "errors" "io" "path" "time" + "github.com/colinmarc/hdfs/v2" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/readers" @@ -141,7 +143,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } - err = out.Close() + // If the datanodes have acknowledged all writes but not yet + // to the namenode, FileWriter.Close can return ErrReplicating + // (wrapped in an os.PathError). This indicates that all data + // has been written, but the lease is still open for the file. + // + // It is safe in this case to either ignore the error (and let + // the lease expire on its own) or to call Close multiple + // times until it completes without an error. The Java client, + // for context, always chooses to retry, with exponential + // backoff. + err = o.fs.pacer.Call(func() (bool, error) { + err := out.Close() + if err == nil { + return false, nil + } + return errors.Is(err, hdfs.ErrReplicating), err + }) if err != nil { cleanup() return err