From 2a4345ca4b7d5d4fa725f04ffeebcd1d368dd2f1 Mon Sep 17 00:00:00 2001 From: Arthur Baars Date: Tue, 19 Jan 2016 14:40:00 +0000 Subject: [PATCH] StorageDriver: GCS: retry all api calls Signed-off-by: Arthur Baars --- registry/storage/driver/gcs/gcs.go | 64 +++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index dd4573b89..0e3480f22 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -206,7 +206,7 @@ func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io. } if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { res.Body.Close() - obj, err := storage.StatObject(d.context(context), d.bucket, name) + obj, err := storageStatObject(d.context(context), d.bucket, name) if err != nil { return nil, err } @@ -287,7 +287,7 @@ func (d *driver) WriteStream(context ctx.Context, path string, offset int64, rea } // wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end // of the function - defer storage.DeleteObject(gcsContext, d.bucket, partName) + defer storageDeleteObject(gcsContext, d.bucket, partName) req := &storageapi.ComposeRequest{ Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType}, @@ -386,7 +386,7 @@ func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, var fi storagedriver.FileInfoFields //try to get as file gcsContext := d.context(context) - obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path)) + obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path)) if err == nil { fi = storagedriver.FileInfoFields{ Path: path, @@ -404,7 +404,7 @@ func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, query.Prefix = dirpath query.MaxResults = 1 - objects, err := storage.ListObjects(gcsContext, d.bucket, query) + objects, err := storageListObjects(gcsContext, d.bucket, query) if err != nil { return nil, err } @@ -432,7 +432,7 @@ func (d *driver) List(context ctx.Context, path string) ([]string, error) { query.Prefix = d.pathToDirKey(path) list := make([]string, 0, 64) for { - objects, err := storage.ListObjects(d.context(context), d.bucket, query) + objects, err := storageListObjects(d.context(context), d.bucket, query) if err != nil { return nil, err } @@ -482,7 +482,7 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e var err error for _, key := range keys { dest := destPrefix + key[len(prefix):] - _, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil) + _, err = storageCopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil) if err == nil { copies = append(copies, dest) } else { @@ -492,20 +492,20 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e // if an error occurred, attempt to cleanup the copies made if err != nil { for i := len(copies) - 1; i >= 0; i-- { - _ = storage.DeleteObject(gcsContext, d.bucket, copies[i]) + _ = storageDeleteObject(gcsContext, d.bucket, copies[i]) } return err } // delete originals for i := len(keys) - 1; i >= 0; i-- { - err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i]) + err2 := storageDeleteObject(gcsContext, d.bucket, keys[i]) if err2 != nil { err = err2 } } return err } - _, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil) + _, err = storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil) if err != nil { if status := err.(*googleapi.Error); status != nil { if status.Code == http.StatusNotFound { @@ -514,7 +514,7 @@ func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) e } return err } - return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) + return storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) } // listAll recursively lists all names of objects stored at "prefix" and its subpaths. @@ -524,7 +524,7 @@ func (d *driver) listAll(context context.Context, prefix string) ([]string, erro query.Prefix = prefix query.Versions = false for { - objects, err := storage.ListObjects(d.context(context), d.bucket, query) + objects, err := storageListObjects(d.context(context), d.bucket, query) if err != nil { return nil, err } @@ -555,8 +555,8 @@ func (d *driver) Delete(context ctx.Context, path string) error { if len(keys) > 0 { sort.Sort(sort.Reverse(sort.StringSlice(keys))) for _, key := range keys { - err := storage.DeleteObject(gcsContext, d.bucket, key) - // GCS only guarantees eventual consistency, solistAll might return + err := storageDeleteObject(gcsContext, d.bucket, key) + // GCS only guarantees eventual consistency, so listAll might return // paths that no longer exist. If this happens, just ignore any not // found error if status, ok := err.(*googleapi.Error); ok { @@ -570,7 +570,7 @@ func (d *driver) Delete(context ctx.Context, path string) error { } return nil } - err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path)) + err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path)) if err != nil { if status := err.(*googleapi.Error); status != nil { if status.Code == http.StatusNotFound { @@ -581,6 +581,42 @@ func (d *driver) Delete(context ctx.Context, path string) error { return err } +func storageDeleteObject(context context.Context, bucket string, name string) error { + return retry(5, func() error { + return storage.DeleteObject(context, bucket, name) + }) +} + +func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) { + var obj *storage.Object + err := retry(5, func() error { + var err error + obj, err = storage.StatObject(context, bucket, name) + return err + }) + return obj, err +} + +func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) { + var objs *storage.Objects + err := retry(5, func() error { + var err error + objs, err = storage.ListObjects(context, bucket, q) + return err + }) + return objs, err +} + +func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) { + var obj *storage.Object + err := retry(5, func() error { + var err error + obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs) + return err + }) + return obj, err +} + // URLFor returns a URL which may be used to retrieve the content stored at // the given path, possibly using the given options. // Returns ErrUnsupportedMethod if this driver has no privateKey