From ae216e365a98d90e6e9d3f510f1b6410b674a994 Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 27 Apr 2015 15:58:58 -0700 Subject: [PATCH] Make Storage Driver API calls context aware. - Change driver interface to take a context as its first argument - Make newFileReader take a context as its first argument - Make newFileWriter take a context as its first argument - Make blobstore exists and delete take a context as a first argument - Pass the layerreader's context to the storage layer - Pass the app's context to purgeuploads - Store the app's context into the blobstore (was previously null) - Pass the trace'd context to the storage drivers Signed-off-by: Richard Scothern --- notifications/listener_test.go | 5 +- registry/handlers/app.go | 14 +- registry/handlers/app_test.go | 5 +- registry/handlers/layer.go | 6 +- registry/storage/blobstore.go | 21 +- registry/storage/driver/azure/azure.go | 19 +- registry/storage/driver/base/base.go | 54 ++-- registry/storage/driver/filesystem/driver.go | 23 +- registry/storage/driver/inmemory/driver.go | 21 +- .../middleware/cloudfront/middleware.go | 6 +- registry/storage/driver/s3/s3.go | 24 +- registry/storage/driver/s3/s3_test.go | 10 +- registry/storage/driver/storagedriver.go | 20 +- .../storage/driver/testsuites/testsuites.go | 243 +++++++++--------- registry/storage/filereader.go | 10 +- registry/storage/filereader_test.go | 16 +- registry/storage/filewriter.go | 10 +- registry/storage/filewriter_test.go | 24 +- registry/storage/layer_test.go | 13 +- registry/storage/layerreader.go | 2 +- registry/storage/layerstore.go | 22 +- registry/storage/layerwriter.go | 41 +-- registry/storage/manifeststore_test.go | 2 +- registry/storage/purgeuploads.go | 14 +- registry/storage/purgeuploads_test.go | 49 ++-- registry/storage/registry.go | 3 +- registry/storage/revisionstore.go | 6 +- registry/storage/signaturestore.go | 2 +- registry/storage/tagstore.go | 11 +- registry/storage/walk.go | 9 +- registry/storage/walk_test.go | 26 +- 31 files changed, 386 insertions(+), 345 deletions(-) diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 956279a2..e046c397 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -17,11 +17,12 @@ import ( ) func TestListener(t *testing.T) { - registry := storage.NewRegistryWithDriver(inmemory.New(), cache.NewInMemoryLayerInfoCache()) + ctx := context.Background() + registry := storage.NewRegistryWithDriver(ctx, inmemory.New(), cache.NewInMemoryLayerInfoCache()) tl := &testListener{ ops: make(map[string]int), } - ctx := context.Background() + repository, err := registry.Repository(ctx, "foo/bar") if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 3cc360c6..40181afa 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -73,7 +73,6 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App var err error app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) - if err != nil { // TODO(stevvooe): Move the creation of a service into a protected // method, where this is created lazily. Its status can be queried via @@ -92,7 +91,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App } - startUploadPurger(app.driver, ctxu.GetLogger(app), purgeConfig) + startUploadPurger(app, app.driver, ctxu.GetLogger(app), purgeConfig) app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"]) if err != nil { @@ -109,10 +108,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.redis == nil { panic("redis configuration required to use for layerinfo cache") } - app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis)) + app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewRedisLayerInfoCache(app.redis)) ctxu.GetLogger(app).Infof("using redis layerinfo cache") case "inmemory": - app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache()) + app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewInMemoryLayerInfoCache()) ctxu.GetLogger(app).Infof("using inmemory layerinfo cache") default: if cc["layerinfo"] != "" { @@ -123,7 +122,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.registry == nil { // configure the registry if no cache section is available. - app.registry = storage.NewRegistryWithDriver(app.driver, nil) + app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil) } app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) @@ -365,7 +364,6 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { } dispatch(context, r).ServeHTTP(w, r) - // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). @@ -597,7 +595,7 @@ func badPurgeUploadConfig(reason string) { // startUploadPurger schedules a goroutine which will periodically // check upload directories for old files and delete them -func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) { +func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) { if config["enabled"] == false { return } @@ -652,7 +650,7 @@ func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logge time.Sleep(jitter) for { - storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool) + storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool) log.Infof("Starting upload purge in %s", intervalDuration) time.Sleep(intervalDuration) } diff --git a/registry/handlers/app_test.go b/registry/handlers/app_test.go index d0b9174d..8ea5b1e5 100644 --- a/registry/handlers/app_test.go +++ b/registry/handlers/app_test.go @@ -24,12 +24,13 @@ import ( // tested individually. func TestAppDispatcher(t *testing.T) { driver := inmemory.New() + ctx := context.Background() app := &App{ Config: configuration.Configuration{}, - Context: context.Background(), + Context: ctx, router: v2.Router(), driver: driver, - registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()), + registry: storage.NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()), } server := httptest.NewServer(app) router := v2.Router() diff --git a/registry/handlers/layer.go b/registry/handlers/layer.go index b8230135..13ee8560 100644 --- a/registry/handlers/layer.go +++ b/registry/handlers/layer.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/docker/distribution" - ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/api/v2" "github.com/gorilla/handlers" @@ -48,7 +48,7 @@ type layerHandler struct { // GetLayer fetches the binary data from backend storage returns it in the // response. func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { - ctxu.GetLogger(lh).Debug("GetImageLayer") + context.GetLogger(lh).Debug("GetImageLayer") layers := lh.Repository.Layers() layer, err := layers.Fetch(lh.Digest) @@ -65,7 +65,7 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { handler, err := layer.Handler(r) if err != nil { - ctxu.GetLogger(lh).Debugf("unexpected error getting layer HTTP handler: %s", err) + context.GetLogger(lh).Debugf("unexpected error getting layer HTTP handler: %s", err) lh.Errors.Push(v2.ErrorCodeUnknown, err) return } diff --git a/registry/storage/blobstore.go b/registry/storage/blobstore.go index 8bab2f5e..c0c86929 100644 --- a/registry/storage/blobstore.go +++ b/registry/storage/blobstore.go @@ -3,10 +3,9 @@ package storage import ( "fmt" - ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" storagedriver "github.com/docker/distribution/registry/storage/driver" - "golang.org/x/net/context" ) // TODO(stevvooe): Currently, the blobStore implementation used by the @@ -32,7 +31,7 @@ func (bs *blobStore) exists(dgst digest.Digest) (bool, error) { return false, err } - ok, err := exists(bs.driver, path) + ok, err := exists(bs.ctx, bs.driver, path) if err != nil { return false, err } @@ -48,7 +47,7 @@ func (bs *blobStore) get(dgst digest.Digest) ([]byte, error) { return nil, err } - return bs.driver.GetContent(bp) + return bs.driver.GetContent(bs.ctx, bp) } // link links the path to the provided digest by writing the digest into the @@ -62,7 +61,7 @@ func (bs *blobStore) link(path string, dgst digest.Digest) error { // The contents of the "link" file are the exact string contents of the // digest, which is specified in that package. - return bs.driver.PutContent(path, []byte(dgst)) + return bs.driver.PutContent(bs.ctx, path, []byte(dgst)) } // linked reads the link at path and returns the content. @@ -77,7 +76,7 @@ func (bs *blobStore) linked(path string) ([]byte, error) { // readlink returns the linked digest at path. func (bs *blobStore) readlink(path string) (digest.Digest, error) { - content, err := bs.driver.GetContent(path) + content, err := bs.driver.GetContent(bs.ctx, path) if err != nil { return "", err } @@ -112,7 +111,7 @@ func (bs *blobStore) resolve(path string) (string, error) { func (bs *blobStore) put(p []byte) (digest.Digest, error) { dgst, err := digest.FromBytes(p) if err != nil { - ctxu.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p)) + context.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p)) return "", err } @@ -128,7 +127,7 @@ func (bs *blobStore) put(p []byte) (digest.Digest, error) { return dgst, nil } - return dgst, bs.driver.PutContent(bp, p) + return dgst, bs.driver.PutContent(bs.ctx, bp, p) } // path returns the canonical path for the blob identified by digest. The blob @@ -145,9 +144,9 @@ func (bs *blobStore) path(dgst digest.Digest) (string, error) { return bp, nil } -// exists provides a utility method to test whether or not -func exists(driver storagedriver.StorageDriver, path string) (bool, error) { - if _, err := driver.Stat(path); err != nil { +// exists provides a utility method to test whether or not a path exists +func exists(ctx context.Context, driver storagedriver.StorageDriver, path string) (bool, error) { + if _, err := driver.Stat(ctx, path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: return false, nil diff --git a/registry/storage/driver/azure/azure.go b/registry/storage/driver/azure/azure.go index b985b7a9..d21a8259 100644 --- a/registry/storage/driver/azure/azure.go +++ b/registry/storage/driver/azure/azure.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" @@ -99,7 +100,7 @@ func (d *driver) Name() string { } // GetContent retrieves the content stored at "path" as a []byte. -func (d *driver) GetContent(path string) ([]byte, error) { +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { blob, err := d.client.GetBlob(d.container, path) if err != nil { if is404(err) { @@ -112,13 +113,13 @@ func (d *driver) GetContent(path string) ([]byte, error) { } // PutContent stores the []byte content at a location designated by "path". -func (d *driver) PutContent(path string, contents []byte) error { +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents))) } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -145,7 +146,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) { +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) { if blobExists, err := d.client.BlobExists(d.container, path); err != nil { return 0, err } else if !blobExists { @@ -166,7 +167,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (int64 // Stat retrieves the FileInfo for the given path, including the current size // in bytes and the creation time. -func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { // Check if the path is a blob if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err @@ -215,7 +216,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { // List returns a list of the objects that are direct descendants of the given // path. -func (d *driver) List(path string) ([]string, error) { +func (d *driver) List(ctx context.Context, path string) ([]string, error) { if path == "/" { path = "" } @@ -231,7 +232,7 @@ func (d *driver) List(path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath) err := d.client.CopyBlob(d.container, destPath, sourceBlobURL) if err != nil { @@ -245,7 +246,7 @@ func (d *driver) Move(sourcePath string, destPath string) error { } // Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(path string) error { +func (d *driver) Delete(ctx context.Context, path string) error { ok, err := d.client.DeleteBlobIfExists(d.container, path) if err != nil { return err @@ -275,7 +276,7 @@ func (d *driver) Delete(path string) error { // URLFor returns a publicly accessible URL for the blob stored at given path // for specified duration by making use of Azure Storage Shared Access Signatures (SAS). // See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info. -func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration expires, ok := options["expiry"] if ok { diff --git a/registry/storage/driver/base/base.go b/registry/storage/driver/base/base.go index 8fa747dd..ae28b187 100644 --- a/registry/storage/driver/base/base.go +++ b/registry/storage/driver/base/base.go @@ -51,32 +51,32 @@ type Base struct { } // GetContent wraps GetContent of underlying storage driver. -func (base *Base) GetContent(path string) ([]byte, error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) { + ctx, done := context.WithTrace(ctx) defer done("%s.GetContent(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.GetContent(path) + return base.StorageDriver.GetContent(ctx, path) } // PutContent wraps PutContent of underlying storage driver. -func (base *Base) PutContent(path string, content []byte) error { - _, done := context.WithTrace(context.Background()) +func (base *Base) PutContent(ctx context.Context, path string, content []byte) error { + ctx, done := context.WithTrace(context.Background()) defer done("%s.PutContent(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.PutContent(path, content) + return base.StorageDriver.PutContent(ctx, path, content) } // ReadStream wraps ReadStream of underlying storage driver. -func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + ctx, done := context.WithTrace(context.Background()) defer done("%s.ReadStream(%q, %d)", base.Name(), path, offset) if offset < 0 { @@ -87,12 +87,12 @@ func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) { return nil, storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.ReadStream(path, offset) + return base.StorageDriver.ReadStream(ctx, path, offset) } // WriteStream wraps WriteStream of underlying storage driver. -func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { + ctx, done := context.WithTrace(ctx) defer done("%s.WriteStream(%q, %d)", base.Name(), path, offset) if offset < 0 { @@ -103,36 +103,36 @@ func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn i return 0, storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.WriteStream(path, offset, reader) + return base.StorageDriver.WriteStream(ctx, path, offset, reader) } // Stat wraps Stat of underlying storage driver. -func (base *Base) Stat(path string) (storagedriver.FileInfo, error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + ctx, done := context.WithTrace(ctx) defer done("%s.Stat(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.Stat(path) + return base.StorageDriver.Stat(ctx, path) } // List wraps List of underlying storage driver. -func (base *Base) List(path string) ([]string, error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) List(ctx context.Context, path string) ([]string, error) { + ctx, done := context.WithTrace(ctx) defer done("%s.List(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) && path != "/" { return nil, storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.List(path) + return base.StorageDriver.List(ctx, path) } // Move wraps Move of underlying storage driver. -func (base *Base) Move(sourcePath string, destPath string) error { - _, done := context.WithTrace(context.Background()) +func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) error { + ctx, done := context.WithTrace(ctx) defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath) if !storagedriver.PathRegexp.MatchString(sourcePath) { @@ -141,29 +141,29 @@ func (base *Base) Move(sourcePath string, destPath string) error { return storagedriver.InvalidPathError{Path: destPath} } - return base.StorageDriver.Move(sourcePath, destPath) + return base.StorageDriver.Move(ctx, sourcePath, destPath) } // Delete wraps Delete of underlying storage driver. -func (base *Base) Delete(path string) error { - _, done := context.WithTrace(context.Background()) +func (base *Base) Delete(ctx context.Context, path string) error { + ctx, done := context.WithTrace(ctx) defer done("%s.Delete(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.Delete(path) + return base.StorageDriver.Delete(ctx, path) } // URLFor wraps URLFor of underlying storage driver. -func (base *Base) URLFor(path string, options map[string]interface{}) (string, error) { - _, done := context.WithTrace(context.Background()) +func (base *Base) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + ctx, done := context.WithTrace(ctx) defer done("%s.URLFor(%q)", base.Name(), path) if !storagedriver.PathRegexp.MatchString(path) { return "", storagedriver.InvalidPathError{Path: path} } - return base.StorageDriver.URLFor(path, options) + return base.StorageDriver.URLFor(ctx, path, options) } diff --git a/registry/storage/driver/filesystem/driver.go b/registry/storage/driver/filesystem/driver.go index 9ffe0888..82960314 100644 --- a/registry/storage/driver/filesystem/driver.go +++ b/registry/storage/driver/filesystem/driver.go @@ -9,6 +9,7 @@ import ( "path" "time" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" @@ -76,8 +77,8 @@ func (d *driver) Name() string { } // GetContent retrieves the content stored at "path" as a []byte. -func (d *driver) GetContent(path string) ([]byte, error) { - rc, err := d.ReadStream(path, 0) +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + rc, err := d.ReadStream(ctx, path, 0) if err != nil { return nil, err } @@ -92,8 +93,8 @@ func (d *driver) GetContent(path string) ([]byte, error) { } // PutContent stores the []byte content at a location designated by "path". -func (d *driver) PutContent(subPath string, contents []byte) error { - if _, err := d.WriteStream(subPath, 0, bytes.NewReader(contents)); err != nil { +func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error { + if _, err := d.WriteStream(ctx, subPath, 0, bytes.NewReader(contents)); err != nil { return err } @@ -102,7 +103,7 @@ func (d *driver) PutContent(subPath string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) if err != nil { if os.IsNotExist(err) { @@ -126,7 +127,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.Reader at a location // designated by the given path. -func (d *driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn int64, err error) { +func (d *driver) WriteStream(ctx context.Context, subPath string, offset int64, reader io.Reader) (nn int64, err error) { // TODO(stevvooe): This needs to be a requirement. // if !path.IsAbs(subPath) { // return fmt.Errorf("absolute path required: %q", subPath) @@ -162,7 +163,7 @@ func (d *driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn // Stat retrieves the FileInfo for the given path, including the current size // in bytes and the creation time. -func (d *driver) Stat(subPath string) (storagedriver.FileInfo, error) { +func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) { fullPath := d.fullPath(subPath) fi, err := os.Stat(fullPath) @@ -182,7 +183,7 @@ func (d *driver) Stat(subPath string) (storagedriver.FileInfo, error) { // List returns a list of the objects that are direct descendants of the given // path. -func (d *driver) List(subPath string) ([]string, error) { +func (d *driver) List(ctx context.Context, subPath string) ([]string, error) { if subPath[len(subPath)-1] != '/' { subPath += "/" } @@ -213,7 +214,7 @@ func (d *driver) List(subPath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { source := d.fullPath(sourcePath) dest := d.fullPath(destPath) @@ -230,7 +231,7 @@ func (d *driver) Move(sourcePath string, destPath string) error { } // Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(subPath string) error { +func (d *driver) Delete(ctx context.Context, subPath string) error { fullPath := d.fullPath(subPath) _, err := os.Stat(fullPath) @@ -246,7 +247,7 @@ func (d *driver) Delete(subPath string) error { // URLFor returns a URL which may be used to retrieve the content stored at the given path. // May return an UnsupportedMethodErr in certain StorageDriver implementations. -func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { return "", storagedriver.ErrUnsupportedMethod } diff --git a/registry/storage/driver/inmemory/driver.go b/registry/storage/driver/inmemory/driver.go index e0694de2..2d121e1c 100644 --- a/registry/storage/driver/inmemory/driver.go +++ b/registry/storage/driver/inmemory/driver.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" @@ -69,11 +70,11 @@ func (d *driver) Name() string { } // GetContent retrieves the content stored at "path" as a []byte. -func (d *driver) GetContent(path string) ([]byte, error) { +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() - rc, err := d.ReadStream(path, 0) + rc, err := d.ReadStream(ctx, path, 0) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func (d *driver) GetContent(path string) ([]byte, error) { } // PutContent stores the []byte content at a location designated by "path". -func (d *driver) PutContent(p string, contents []byte) error { +func (d *driver) PutContent(ctx context.Context, p string, contents []byte) error { d.mutex.Lock() defer d.mutex.Unlock() @@ -102,7 +103,7 @@ func (d *driver) PutContent(p string, contents []byte) error { // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() @@ -126,7 +127,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { d.mutex.Lock() defer d.mutex.Unlock() @@ -167,7 +168,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (nn in } // Stat returns info about the provided path. -func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { d.mutex.RLock() defer d.mutex.RUnlock() @@ -193,7 +194,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { // List returns a list of the objects that are direct descendants of the given // path. -func (d *driver) List(path string) ([]string, error) { +func (d *driver) List(ctx context.Context, path string) ([]string, error) { d.mutex.RLock() defer d.mutex.RUnlock() @@ -223,7 +224,7 @@ func (d *driver) List(path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { d.mutex.Lock() defer d.mutex.Unlock() @@ -239,7 +240,7 @@ func (d *driver) Move(sourcePath string, destPath string) error { } // Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(path string) error { +func (d *driver) Delete(ctx context.Context, path string) error { d.mutex.Lock() defer d.mutex.Unlock() @@ -256,6 +257,6 @@ func (d *driver) Delete(path string) error { // URLFor returns a URL which may be used to retrieve the content stored at the given path. // May return an UnsupportedMethodErr in certain StorageDriver implementations. -func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { return "", storagedriver.ErrUnsupportedMethod } diff --git a/registry/storage/driver/middleware/cloudfront/middleware.go b/registry/storage/driver/middleware/cloudfront/middleware.go index aee068a5..31c00afc 100644 --- a/registry/storage/driver/middleware/cloudfront/middleware.go +++ b/registry/storage/driver/middleware/cloudfront/middleware.go @@ -98,12 +98,12 @@ type S3BucketKeyer interface { // Resolve returns an http.Handler which can serve the contents of the given // Layer, or an error if not supported by the storagedriver. -func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) { +func (lh *cloudFrontStorageMiddleware) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { // TODO(endophage): currently only supports S3 keyer, ok := lh.StorageDriver.(S3BucketKeyer) if !ok { - context.GetLogger(context.Background()).Warn("the CloudFront middleware does not support this backend storage driver") - return lh.StorageDriver.URLFor(path, options) + context.GetLogger(ctx).Warn("the CloudFront middleware does not support this backend storage driver") + return lh.StorageDriver.URLFor(ctx, path, options) } cfURL, err := lh.cloudfront.CannedSignedURL(keyer.S3BucketKey(path), "", time.Now().Add(lh.duration)) diff --git a/registry/storage/driver/s3/s3.go b/registry/storage/driver/s3/s3.go index fe23262e..f6e7900e 100644 --- a/registry/storage/driver/s3/s3.go +++ b/registry/storage/driver/s3/s3.go @@ -29,6 +29,8 @@ import ( "github.com/AdRoll/goamz/aws" "github.com/AdRoll/goamz/s3" "github.com/Sirupsen/logrus" + + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" @@ -267,7 +269,7 @@ func (d *driver) Name() string { } // GetContent retrieves the content stored at "path" as a []byte. -func (d *driver) GetContent(path string) ([]byte, error) { +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { content, err := d.Bucket.Get(d.s3Path(path)) if err != nil { return nil, parseError(path, err) @@ -276,13 +278,13 @@ func (d *driver) GetContent(path string) ([]byte, error) { } // PutContent stores the []byte content at a location designated by "path". -func (d *driver) PutContent(path string, contents []byte) error { +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions())) } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") @@ -304,7 +306,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // returned. May be used to resume writing a stream by providing a nonzero // offset. Offsets past the current size will write from the position // beyond the end of the file. -func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) { +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { partNumber := 1 bytesRead := 0 var putErrChan chan error @@ -348,7 +350,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total // Fills from 0 to total from current fromSmallCurrent := func(total int64) error { - current, err := d.ReadStream(path, 0) + current, err := d.ReadStream(ctx, path, 0) if err != nil { return err } @@ -628,7 +630,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total // Stat retrieves the FileInfo for the given path, including the current size // in bytes and the creation time. -func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { listResponse, err := d.Bucket.List(d.s3Path(path), "", "", 1) if err != nil { return nil, err @@ -661,7 +663,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) { } // List returns a list of the objects that are direct descendants of the given path. -func (d *driver) List(path string) ([]string, error) { +func (d *driver) List(ctx context.Context, path string) ([]string, error) { if path != "/" && path[len(path)-1] != '/' { path = path + "/" } @@ -706,7 +708,7 @@ func (d *driver) List(path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. -func (d *driver) Move(sourcePath string, destPath string) error { +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { /* This is terrible, but aws doesn't have an actual move. */ _, err := d.Bucket.PutCopy(d.s3Path(destPath), getPermissions(), s3.CopyOptions{Options: d.getOptions(), ContentType: d.getContentType()}, d.Bucket.Name+"/"+d.s3Path(sourcePath)) @@ -714,11 +716,11 @@ func (d *driver) Move(sourcePath string, destPath string) error { return parseError(sourcePath, err) } - return d.Delete(sourcePath) + return d.Delete(ctx, sourcePath) } // Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(path string) error { +func (d *driver) Delete(ctx context.Context, path string) error { listResponse, err := d.Bucket.List(d.s3Path(path), "", "", listMax) if err != nil || len(listResponse.Contents) == 0 { return storagedriver.PathNotFoundError{Path: path} @@ -747,7 +749,7 @@ func (d *driver) Delete(path string) error { // URLFor returns a URL which may be used to retrieve the content stored at the given path. // May return an UnsupportedMethodErr in certain StorageDriver implementations. -func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { methodString := "GET" method, ok := options["method"] if ok { diff --git a/registry/storage/driver/s3/s3_test.go b/registry/storage/driver/s3/s3_test.go index 69543bcb..c608e454 100644 --- a/registry/storage/driver/s3/s3_test.go +++ b/registry/storage/driver/s3/s3_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/AdRoll/goamz/aws" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/testsuites" @@ -134,16 +135,17 @@ func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) { filename := "/test" contents := []byte("contents") - err = rootedDriver.PutContent(filename, contents) + ctx := context.Background() + err = rootedDriver.PutContent(ctx, filename, contents) c.Assert(err, check.IsNil) - defer rootedDriver.Delete(filename) + defer rootedDriver.Delete(ctx, filename) - keys, err := emptyRootDriver.List("/") + keys, err := emptyRootDriver.List(ctx, "/") for _, path := range keys { c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) } - keys, err = slashRootDriver.List("/") + keys, err = slashRootDriver.List(ctx, "/") for _, path := range keys { c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) } diff --git a/registry/storage/driver/storagedriver.go b/registry/storage/driver/storagedriver.go index cda1c37d..bade099f 100644 --- a/registry/storage/driver/storagedriver.go +++ b/registry/storage/driver/storagedriver.go @@ -7,6 +7,8 @@ import ( "regexp" "strconv" "strings" + + "github.com/docker/distribution/context" ) // Version is a string representing the storage driver version, of the form @@ -42,45 +44,45 @@ type StorageDriver interface { // GetContent retrieves the content stored at "path" as a []byte. // This should primarily be used for small objects. - GetContent(path string) ([]byte, error) + GetContent(ctx context.Context, path string) ([]byte, error) // PutContent stores the []byte content at a location designated by "path". // This should primarily be used for small objects. - PutContent(path string, content []byte) error + PutContent(ctx context.Context, path string, content []byte) error // ReadStream retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(path string, offset int64) (io.ReadCloser, error) + ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) // WriteStream stores the contents of the provided io.ReadCloser at a // location designated by the given path. // May be used to resume writing a stream by providing a nonzero offset. // The offset must be no larger than the CurrentSize for this path. - WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) + WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) // Stat retrieves the FileInfo for the given path, including the current // size in bytes and the creation time. - Stat(path string) (FileInfo, error) + Stat(ctx context.Context, path string) (FileInfo, error) // List returns a list of the objects that are direct descendants of the //given path. - List(path string) ([]string, error) + List(ctx context.Context, path string) ([]string, error) // Move moves an object stored at sourcePath to destPath, removing the // original object. // Note: This may be no more efficient than a copy followed by a delete for // many implementations. - Move(sourcePath string, destPath string) error + Move(ctx context.Context, sourcePath string, destPath string) error // Delete recursively deletes all objects stored at "path" and its subpaths. - Delete(path string) error + Delete(ctx context.Context, path string) error // URLFor returns a URL which may be used to retrieve the content stored at // the given path, possibly using the given options. // May return an ErrUnsupportedMethod in certain StorageDriver // implementations. - URLFor(path string, options map[string]interface{}) (string, error) + URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) } // PathRegexp is the regular expression which each file path must match. A diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 9f387a62..9185ebbc 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "gopkg.in/check.v1" ) @@ -27,6 +28,7 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC check.Suite(&DriverSuite{ Constructor: driverConstructor, SkipCheck: skipCheck, + ctx: context.Background(), }) } @@ -88,6 +90,7 @@ type DriverSuite struct { Teardown DriverTeardown SkipCheck storagedriver.StorageDriver + ctx context.Context } // SetUpSuite sets up the gocheck test suite. @@ -112,7 +115,7 @@ func (suite *DriverSuite) TearDownSuite(c *check.C) { // This causes the suite to abort if any files are left around in the storage // driver. func (suite *DriverSuite) TearDownTest(c *check.C) { - files, _ := suite.StorageDriver.List("/") + files, _ := suite.StorageDriver.List(suite.ctx, "/") if len(files) > 0 { c.Fatalf("Storage driver did not clean up properly. Offending files: %#v", files) } @@ -141,11 +144,11 @@ func (suite *DriverSuite) TestValidPaths(c *check.C) { "/Abc/Cba"} for _, filename := range validFiles { - err := suite.StorageDriver.PutContent(filename, contents) - defer suite.StorageDriver.Delete(firstPart(filename)) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) c.Assert(err, check.IsNil) - received, err := suite.StorageDriver.GetContent(filename) + received, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, contents) } @@ -164,12 +167,12 @@ func (suite *DriverSuite) TestInvalidPaths(c *check.C) { "/abc_123/"} for _, filename := range invalidFiles { - err := suite.StorageDriver.PutContent(filename, contents) - defer suite.StorageDriver.Delete(firstPart(filename)) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.InvalidPathError{}) - _, err = suite.StorageDriver.GetContent(filename) + _, err = suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.InvalidPathError{}) } @@ -225,7 +228,7 @@ func (suite *DriverSuite) TestTruncate(c *check.C) { // TestReadNonexistent tests reading content from an empty path. func (suite *DriverSuite) TestReadNonexistent(c *check.C) { filename := randomPath(32) - _, err := suite.StorageDriver.GetContent(filename) + _, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -277,17 +280,17 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) { } filename := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) checksum := sha1.New() var fileSize int64 = 5 * 1024 * 1024 * 1024 contents := newRandReader(fileSize) - written, err := suite.StorageDriver.WriteStream(filename, 0, io.TeeReader(contents, checksum)) + written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, io.TeeReader(contents, checksum)) c.Assert(err, check.IsNil) c.Assert(written, check.Equals, fileSize) - reader, err := suite.StorageDriver.ReadStream(filename, 0) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.IsNil) writtenChecksum := sha1.New() @@ -300,7 +303,7 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) { // reading with a given offset. func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { filename := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) chunkSize := int64(32) @@ -308,10 +311,10 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { contentsChunk2 := randomContents(chunkSize) contentsChunk3 := randomContents(chunkSize) - err := suite.StorageDriver.PutContent(filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) + err := suite.StorageDriver.PutContent(suite.ctx, filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) c.Assert(err, check.IsNil) - reader, err := suite.StorageDriver.ReadStream(filename, 0) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() @@ -320,7 +323,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) - reader, err = suite.StorageDriver.ReadStream(filename, chunkSize) + reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize) c.Assert(err, check.IsNil) defer reader.Close() @@ -329,7 +332,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, append(contentsChunk2, contentsChunk3...)) - reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*2) + reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*2) c.Assert(err, check.IsNil) defer reader.Close() @@ -338,7 +341,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, contentsChunk3) // Ensure we get invalid offest for negative offsets. - reader, err = suite.StorageDriver.ReadStream(filename, -1) + reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, -1) c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) @@ -346,7 +349,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { // Read past the end of the content and make sure we get a reader that // returns 0 bytes and io.EOF - reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) + reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3) c.Assert(err, check.IsNil) defer reader.Close() @@ -356,7 +359,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(n, check.Equals, 0) // Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF. - reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3-1) + reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3-1) c.Assert(err, check.IsNil) defer reader.Close() @@ -389,7 +392,7 @@ func (suite *DriverSuite) TestContinueStreamAppendSmall(c *check.C) { func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) { filename := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) contentsChunk1 := randomContents(chunkSize) contentsChunk2 := randomContents(chunkSize) @@ -399,39 +402,39 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) - nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contentsChunk1)) + nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contentsChunk1)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contentsChunk1))) - fi, err := suite.StorageDriver.Stat(filename) + fi, err := suite.StorageDriver.Stat(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1))) - nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(contentsChunk2)) + nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(contentsChunk2)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contentsChunk2))) - fi, err = suite.StorageDriver.Stat(filename) + fi, err = suite.StorageDriver.Stat(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Size(), check.Equals, 2*chunkSize) // Test re-writing the last chunk - nn, err = suite.StorageDriver.WriteStream(filename, fi.Size()-chunkSize, bytes.NewReader(contentsChunk2)) + nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size()-chunkSize, bytes.NewReader(contentsChunk2)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contentsChunk2))) - fi, err = suite.StorageDriver.Stat(filename) + fi, err = suite.StorageDriver.Stat(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Size(), check.Equals, 2*chunkSize) - nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():])) + nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():])) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():]))) - received, err := suite.StorageDriver.GetContent(filename) + received, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, fullContents) @@ -443,16 +446,16 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) fullContents = append(fullContents, zeroChunk...) fullContents = append(fullContents, contentsChunk4...) - nn, err = suite.StorageDriver.WriteStream(filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4)) + nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, chunkSize) - fi, err = suite.StorageDriver.Stat(filename) + fi, err = suite.StorageDriver.Stat(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Size(), check.Equals, int64(len(fullContents))) - received, err = suite.StorageDriver.GetContent(filename) + received, err = suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(len(received), check.Equals, len(fullContents)) c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk) @@ -460,7 +463,7 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) c.Assert(received, check.DeepEquals, fullContents) // Ensure that negative offsets return correct error. - nn, err = suite.StorageDriver.WriteStream(filename, -1, bytes.NewReader(zeroChunk)) + nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, -1, bytes.NewReader(zeroChunk)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) @@ -472,11 +475,11 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { filename := randomPath(32) - _, err := suite.StorageDriver.ReadStream(filename, 0) + _, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) - _, err = suite.StorageDriver.ReadStream(filename, 64) + _, err = suite.StorageDriver.ReadStream(suite.ctx, filename, 64) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -484,27 +487,27 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { // TestList checks the returned list of keys after populating a directory tree. func (suite *DriverSuite) TestList(c *check.C) { rootDirectory := "/" + randomFilename(int64(8+rand.Intn(8))) - defer suite.StorageDriver.Delete(rootDirectory) + defer suite.StorageDriver.Delete(suite.ctx, rootDirectory) parentDirectory := rootDirectory + "/" + randomFilename(int64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { childFile := parentDirectory + "/" + randomFilename(int64(8+rand.Intn(8))) childFiles[i] = childFile - err := suite.StorageDriver.PutContent(childFile, randomContents(32)) + err := suite.StorageDriver.PutContent(suite.ctx, childFile, randomContents(32)) c.Assert(err, check.IsNil) } sort.Strings(childFiles) - keys, err := suite.StorageDriver.List("/") + keys, err := suite.StorageDriver.List(suite.ctx, "/") c.Assert(err, check.IsNil) c.Assert(keys, check.DeepEquals, []string{rootDirectory}) - keys, err = suite.StorageDriver.List(rootDirectory) + keys, err = suite.StorageDriver.List(suite.ctx, rootDirectory) c.Assert(err, check.IsNil) c.Assert(keys, check.DeepEquals, []string{parentDirectory}) - keys, err = suite.StorageDriver.List(parentDirectory) + keys, err = suite.StorageDriver.List(suite.ctx, parentDirectory) c.Assert(err, check.IsNil) sort.Strings(keys) @@ -523,20 +526,20 @@ func (suite *DriverSuite) TestMove(c *check.C) { sourcePath := randomPath(32) destPath := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(sourcePath)) - defer suite.StorageDriver.Delete(firstPart(destPath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(sourcePath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath)) - err := suite.StorageDriver.PutContent(sourcePath, contents) + err := suite.StorageDriver.PutContent(suite.ctx, sourcePath, contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Move(sourcePath, destPath) + err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath) c.Assert(err, check.IsNil) - received, err := suite.StorageDriver.GetContent(destPath) + received, err := suite.StorageDriver.GetContent(suite.ctx, destPath) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, contents) - _, err = suite.StorageDriver.GetContent(sourcePath) + _, err = suite.StorageDriver.GetContent(suite.ctx, sourcePath) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -549,23 +552,23 @@ func (suite *DriverSuite) TestMoveOverwrite(c *check.C) { sourceContents := randomContents(32) destContents := randomContents(64) - defer suite.StorageDriver.Delete(firstPart(sourcePath)) - defer suite.StorageDriver.Delete(firstPart(destPath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(sourcePath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath)) - err := suite.StorageDriver.PutContent(sourcePath, sourceContents) + err := suite.StorageDriver.PutContent(suite.ctx, sourcePath, sourceContents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.PutContent(destPath, destContents) + err = suite.StorageDriver.PutContent(suite.ctx, destPath, destContents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Move(sourcePath, destPath) + err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath) c.Assert(err, check.IsNil) - received, err := suite.StorageDriver.GetContent(destPath) + received, err := suite.StorageDriver.GetContent(suite.ctx, destPath) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, sourceContents) - _, err = suite.StorageDriver.GetContent(sourcePath) + _, err = suite.StorageDriver.GetContent(suite.ctx, sourcePath) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -577,16 +580,16 @@ func (suite *DriverSuite) TestMoveNonexistent(c *check.C) { sourcePath := randomPath(32) destPath := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(destPath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath)) - err := suite.StorageDriver.PutContent(destPath, contents) + err := suite.StorageDriver.PutContent(suite.ctx, destPath, contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Move(sourcePath, destPath) + err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) - received, err := suite.StorageDriver.GetContent(destPath) + received, err := suite.StorageDriver.GetContent(suite.ctx, destPath) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, contents) } @@ -596,12 +599,12 @@ func (suite *DriverSuite) TestMoveInvalid(c *check.C) { contents := randomContents(32) // Create a regular file. - err := suite.StorageDriver.PutContent("/notadir", contents) + err := suite.StorageDriver.PutContent(suite.ctx, "/notadir", contents) c.Assert(err, check.IsNil) - defer suite.StorageDriver.Delete("/notadir") + defer suite.StorageDriver.Delete(suite.ctx, "/notadir") // Now try to move a non-existent file under it. - err = suite.StorageDriver.Move("/notadir/foo", "/notadir/bar") + err = suite.StorageDriver.Move(suite.ctx, "/notadir/foo", "/notadir/bar") c.Assert(err, check.NotNil) // non-nil error } @@ -611,15 +614,15 @@ func (suite *DriverSuite) TestDelete(c *check.C) { filename := randomPath(32) contents := randomContents(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) - err := suite.StorageDriver.PutContent(filename, contents) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Delete(filename) + err = suite.StorageDriver.Delete(suite.ctx, filename) c.Assert(err, check.IsNil) - _, err = suite.StorageDriver.GetContent(filename) + _, err = suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -630,12 +633,12 @@ func (suite *DriverSuite) TestURLFor(c *check.C) { filename := randomPath(32) contents := randomContents(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) - err := suite.StorageDriver.PutContent(filename, contents) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) - url, err := suite.StorageDriver.URLFor(filename, nil) + url, err := suite.StorageDriver.URLFor(suite.ctx, filename, nil) if err == storagedriver.ErrUnsupportedMethod { return } @@ -649,7 +652,7 @@ func (suite *DriverSuite) TestURLFor(c *check.C) { c.Assert(err, check.IsNil) c.Assert(read, check.DeepEquals, contents) - url, err = suite.StorageDriver.URLFor(filename, map[string]interface{}{"method": "HEAD"}) + url, err = suite.StorageDriver.URLFor(suite.ctx, filename, map[string]interface{}{"method": "HEAD"}) if err == storagedriver.ErrUnsupportedMethod { return } @@ -663,7 +666,7 @@ func (suite *DriverSuite) TestURLFor(c *check.C) { // TestDeleteNonexistent checks that removing a nonexistent key fails. func (suite *DriverSuite) TestDeleteNonexistent(c *check.C) { filename := randomPath(32) - err := suite.StorageDriver.Delete(filename) + err := suite.StorageDriver.Delete(suite.ctx, filename) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -676,42 +679,42 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) { filename3 := randomPath(32) contents := randomContents(32) - defer suite.StorageDriver.Delete(firstPart(dirname)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(dirname)) - err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents) + err := suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename1), contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.PutContent(path.Join(dirname, filename2), contents) + err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename2), contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.PutContent(path.Join(dirname, filename3), contents) + err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename3), contents) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Delete(path.Join(dirname, filename1)) + err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, filename1)) c.Assert(err, check.IsNil) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename1)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename2)) c.Assert(err, check.IsNil) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename3)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename3)) c.Assert(err, check.IsNil) - err = suite.StorageDriver.Delete(dirname) + err = suite.StorageDriver.Delete(suite.ctx, dirname) c.Assert(err, check.IsNil) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename1)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename2)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) - _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename3)) + _, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename3)) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) } @@ -723,24 +726,24 @@ func (suite *DriverSuite) TestStatCall(c *check.C) { fileName := randomFilename(32) filePath := path.Join(dirPath, fileName) - defer suite.StorageDriver.Delete(firstPart(dirPath)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(dirPath)) // Call on non-existent file/dir, check error. - fi, err := suite.StorageDriver.Stat(dirPath) + fi, err := suite.StorageDriver.Stat(suite.ctx, dirPath) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) c.Assert(fi, check.IsNil) - fi, err = suite.StorageDriver.Stat(filePath) + fi, err = suite.StorageDriver.Stat(suite.ctx, filePath) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) c.Assert(fi, check.IsNil) - err = suite.StorageDriver.PutContent(filePath, content) + err = suite.StorageDriver.PutContent(suite.ctx, filePath, content) c.Assert(err, check.IsNil) // Call on regular file, check results - fi, err = suite.StorageDriver.Stat(filePath) + fi, err = suite.StorageDriver.Stat(suite.ctx, filePath) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Path(), check.Equals, filePath) @@ -751,9 +754,9 @@ func (suite *DriverSuite) TestStatCall(c *check.C) { // Sleep and modify the file time.Sleep(time.Second * 10) content = randomContents(4096) - err = suite.StorageDriver.PutContent(filePath, content) + err = suite.StorageDriver.PutContent(suite.ctx, filePath, content) c.Assert(err, check.IsNil) - fi, err = suite.StorageDriver.Stat(filePath) + fi, err = suite.StorageDriver.Stat(suite.ctx, filePath) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) time.Sleep(time.Second * 5) // allow changes to propagate (eventual consistency) @@ -768,7 +771,7 @@ func (suite *DriverSuite) TestStatCall(c *check.C) { } // Call on directory (do not check ModTime as dirs don't need to support it) - fi, err = suite.StorageDriver.Stat(dirPath) + fi, err = suite.StorageDriver.Stat(suite.ctx, dirPath) c.Assert(err, check.IsNil) c.Assert(fi, check.NotNil) c.Assert(fi.Path(), check.Equals, dirPath) @@ -784,15 +787,15 @@ func (suite *DriverSuite) TestPutContentMultipleTimes(c *check.C) { filename := randomPath(32) contents := randomContents(4096) - defer suite.StorageDriver.Delete(firstPart(filename)) - err := suite.StorageDriver.PutContent(filename, contents) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) contents = randomContents(2048) // upload a different, smaller file - err = suite.StorageDriver.PutContent(filename, contents) + err = suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) - readContents, err := suite.StorageDriver.GetContent(filename) + readContents, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(readContents, check.DeepEquals, contents) } @@ -810,9 +813,9 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) { filename := randomPath(32) contents := randomContents(filesize) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) - err := suite.StorageDriver.PutContent(filename, contents) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) var wg sync.WaitGroup @@ -820,7 +823,7 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) { readContents := func() { defer wg.Done() offset := rand.Int63n(int64(len(contents))) - reader, err := suite.StorageDriver.ReadStream(filename, offset) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset) c.Assert(err, check.IsNil) readContents, err := ioutil.ReadAll(reader) @@ -872,7 +875,7 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) { } filename := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) var offset int64 var misswrites int @@ -880,17 +883,17 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) { for i := 0; i < 1024; i++ { contents := randomContents(chunkSize) - read, err := suite.StorageDriver.WriteStream(filename, offset, bytes.NewReader(contents)) + read, err := suite.StorageDriver.WriteStream(suite.ctx, filename, offset, bytes.NewReader(contents)) c.Assert(err, check.IsNil) - fi, err := suite.StorageDriver.Stat(filename) + fi, err := suite.StorageDriver.Stat(suite.ctx, filename) c.Assert(err, check.IsNil) // We are most concerned with being able to read data as soon as Stat declares // it is uploaded. This is the strongest guarantee that some drivers (that guarantee // at best eventual consistency) absolutely need to provide. if fi.Size() == offset+chunkSize { - reader, err := suite.StorageDriver.ReadStream(filename, offset) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset) c.Assert(err, check.IsNil) readContents, err := ioutil.ReadAll(reader) @@ -937,15 +940,15 @@ func (suite *DriverSuite) benchmarkPutGetFiles(c *check.C, size int64) { parentDir := randomPath(8) defer func() { c.StopTimer() - suite.StorageDriver.Delete(firstPart(parentDir)) + suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir)) }() for i := 0; i < c.N; i++ { filename := path.Join(parentDir, randomPath(32)) - err := suite.StorageDriver.PutContent(filename, randomContents(size)) + err := suite.StorageDriver.PutContent(suite.ctx, filename, randomContents(size)) c.Assert(err, check.IsNil) - _, err = suite.StorageDriver.GetContent(filename) + _, err = suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) } } @@ -975,16 +978,16 @@ func (suite *DriverSuite) benchmarkStreamFiles(c *check.C, size int64) { parentDir := randomPath(8) defer func() { c.StopTimer() - suite.StorageDriver.Delete(firstPart(parentDir)) + suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir)) }() for i := 0; i < c.N; i++ { filename := path.Join(parentDir, randomPath(32)) - written, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(randomContents(size))) + written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(randomContents(size))) c.Assert(err, check.IsNil) c.Assert(written, check.Equals, size) - rc, err := suite.StorageDriver.ReadStream(filename, 0) + rc, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.IsNil) rc.Close() } @@ -1004,17 +1007,17 @@ func (suite *DriverSuite) benchmarkListFiles(c *check.C, numFiles int64) { parentDir := randomPath(8) defer func() { c.StopTimer() - suite.StorageDriver.Delete(firstPart(parentDir)) + suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir)) }() for i := int64(0); i < numFiles; i++ { - err := suite.StorageDriver.PutContent(path.Join(parentDir, randomPath(32)), nil) + err := suite.StorageDriver.PutContent(suite.ctx, path.Join(parentDir, randomPath(32)), nil) c.Assert(err, check.IsNil) } c.ResetTimer() for i := 0; i < c.N; i++ { - files, err := suite.StorageDriver.List(parentDir) + files, err := suite.StorageDriver.List(suite.ctx, parentDir) c.Assert(err, check.IsNil) c.Assert(int64(len(files)), check.Equals, numFiles) } @@ -1033,17 +1036,17 @@ func (suite *DriverSuite) BenchmarkDelete50Files(c *check.C) { func (suite *DriverSuite) benchmarkDeleteFiles(c *check.C, numFiles int64) { for i := 0; i < c.N; i++ { parentDir := randomPath(8) - defer suite.StorageDriver.Delete(firstPart(parentDir)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir)) c.StopTimer() for j := int64(0); j < numFiles; j++ { - err := suite.StorageDriver.PutContent(path.Join(parentDir, randomPath(32)), nil) + err := suite.StorageDriver.PutContent(suite.ctx, path.Join(parentDir, randomPath(32)), nil) c.Assert(err, check.IsNil) } c.StartTimer() // This is the operation we're benchmarking - err := suite.StorageDriver.Delete(firstPart(parentDir)) + err := suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir)) c.Assert(err, check.IsNil) } } @@ -1055,7 +1058,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { defer tf.Close() filename := randomPath(32) - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) contents := randomContents(size) @@ -1065,11 +1068,11 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf.Sync() tf.Seek(0, os.SEEK_SET) - nn, err := suite.StorageDriver.WriteStream(filename, 0, tf) + nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, tf) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, size) - reader, err := suite.StorageDriver.ReadStream(filename, 0) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() @@ -1080,25 +1083,25 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { } func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents []byte) { - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) - err := suite.StorageDriver.PutContent(filename, contents) + err := suite.StorageDriver.PutContent(suite.ctx, filename, contents) c.Assert(err, check.IsNil) - readContents, err := suite.StorageDriver.GetContent(filename) + readContents, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(readContents, check.DeepEquals, contents) } func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { - defer suite.StorageDriver.Delete(firstPart(filename)) + defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename)) - nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contents)) + nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contents)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contents))) - reader, err := suite.StorageDriver.ReadStream(filename, 0) + reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() diff --git a/registry/storage/filereader.go b/registry/storage/filereader.go index 65d4347f..72d58f8a 100644 --- a/registry/storage/filereader.go +++ b/registry/storage/filereader.go @@ -9,6 +9,7 @@ import ( "os" "time" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" ) @@ -25,6 +26,8 @@ const fileReaderBufferSize = 4 << 20 type fileReader struct { driver storagedriver.StorageDriver + ctx context.Context + // identifying fields path string size int64 // size is the total size, must be set. @@ -40,14 +43,15 @@ type fileReader struct { // newFileReader initializes a file reader for the remote file. The read takes // on the offset and size at the time the reader is created. If the underlying // file changes, one must create a new fileReader. -func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { +func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string) (*fileReader, error) { rd := &fileReader{ driver: driver, path: path, + ctx: ctx, } // Grab the size of the layer file, ensuring existence. - if fi, err := driver.Stat(path); err != nil { + if fi, err := driver.Stat(ctx, path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // NOTE(stevvooe): We really don't care if the file is not @@ -141,7 +145,7 @@ func (fr *fileReader) reader() (io.Reader, error) { } // If we don't have a reader, open one up. - rc, err := fr.driver.ReadStream(fr.path, fr.offset) + rc, err := fr.driver.ReadStream(fr.ctx, fr.path, fr.offset) if err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: diff --git a/registry/storage/filereader_test.go b/registry/storage/filereader_test.go index 8a077603..c48bf16d 100644 --- a/registry/storage/filereader_test.go +++ b/registry/storage/filereader_test.go @@ -8,12 +8,13 @@ import ( "os" "testing" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/registry/storage/driver/inmemory" ) func TestSimpleRead(t *testing.T) { + ctx := context.Background() content := make([]byte, 1<<20) n, err := rand.Read(content) if err != nil { @@ -21,7 +22,7 @@ func TestSimpleRead(t *testing.T) { } if n != len(content) { - t.Fatalf("random read did't fill buffer") + t.Fatalf("random read didn't fill buffer") } dgst, err := digest.FromReader(bytes.NewReader(content)) @@ -32,11 +33,11 @@ func TestSimpleRead(t *testing.T) { driver := inmemory.New() path := "/random" - if err := driver.PutContent(path, content); err != nil { + if err := driver.PutContent(ctx, path, content); err != nil { t.Fatalf("error putting patterned content: %v", err) } - fr, err := newFileReader(driver, path) + fr, err := newFileReader(ctx, driver, path) if err != nil { t.Fatalf("error allocating file reader: %v", err) } @@ -59,12 +60,13 @@ func TestFileReaderSeek(t *testing.T) { repititions := 1024 path := "/patterned" content := bytes.Repeat([]byte(pattern), repititions) + ctx := context.Background() - if err := driver.PutContent(path, content); err != nil { + if err := driver.PutContent(ctx, path, content); err != nil { t.Fatalf("error putting patterned content: %v", err) } - fr, err := newFileReader(driver, path) + fr, err := newFileReader(ctx, driver, path) if err != nil { t.Fatalf("unexpected error creating file reader: %v", err) @@ -160,7 +162,7 @@ func TestFileReaderSeek(t *testing.T) { // read method, with an io.EOF error. func TestFileReaderNonExistentFile(t *testing.T) { driver := inmemory.New() - fr, err := newFileReader(driver, "/doesnotexist") + fr, err := newFileReader(context.Background(), driver, "/doesnotexist") if err != nil { t.Fatalf("unexpected error initializing reader: %v", err) } diff --git a/registry/storage/filewriter.go b/registry/storage/filewriter.go index 5f22142e..95930f1d 100644 --- a/registry/storage/filewriter.go +++ b/registry/storage/filewriter.go @@ -7,6 +7,7 @@ import ( "io" "os" + "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" ) @@ -18,6 +19,8 @@ const ( type fileWriter struct { driver storagedriver.StorageDriver + ctx context.Context + // identifying fields path string @@ -45,13 +48,14 @@ var _ fileWriterInterface = &fileWriter{} // newFileWriter returns a prepared fileWriter for the driver and path. This // could be considered similar to an "open" call on a regular filesystem. -func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) { +func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) { fw := fileWriter{ driver: driver, path: path, + ctx: ctx, } - if fi, err := driver.Stat(path); err != nil { + if fi, err := driver.Stat(ctx, path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // ignore, offset is zero @@ -179,7 +183,7 @@ func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) updateOffset = true } - nn, err := fw.driver.WriteStream(fw.path, offset, r) + nn, err := fw.driver.WriteStream(fw.ctx, fw.path, offset, r) if updateOffset { // We should forward the offset, whether or not there was an error. diff --git a/registry/storage/filewriter_test.go b/registry/storage/filewriter_test.go index a8ea6241..720e9385 100644 --- a/registry/storage/filewriter_test.go +++ b/registry/storage/filewriter_test.go @@ -7,6 +7,7 @@ import ( "os" "testing" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" @@ -32,8 +33,9 @@ func TestSimpleWrite(t *testing.T) { driver := inmemory.New() path := "/random" + ctx := context.Background() - fw, err := newFileWriter(driver, path) + fw, err := newFileWriter(ctx, driver, path) if err != nil { t.Fatalf("unexpected error creating fileWriter: %v", err) } @@ -49,7 +51,7 @@ func TestSimpleWrite(t *testing.T) { t.Fatalf("unexpected write length: %d != %d", n, len(content)) } - fr, err := newFileReader(driver, path) + fr, err := newFileReader(ctx, driver, path) if err != nil { t.Fatalf("unexpected error creating fileReader: %v", err) } @@ -92,7 +94,7 @@ func TestSimpleWrite(t *testing.T) { t.Fatalf("writeat was short: %d != %d", n, len(content)) } - fr, err = newFileReader(driver, path) + fr, err = newFileReader(ctx, driver, path) if err != nil { t.Fatalf("unexpected error creating fileReader: %v", err) } @@ -122,13 +124,13 @@ func TestSimpleWrite(t *testing.T) { // Now, we copy from one path to another, running the data through the // fileReader to fileWriter, rather than the driver.Move command to ensure // everything is working correctly. - fr, err = newFileReader(driver, path) + fr, err = newFileReader(ctx, driver, path) if err != nil { t.Fatalf("unexpected error creating fileReader: %v", err) } defer fr.Close() - fw, err = newFileWriter(driver, "/copied") + fw, err = newFileWriter(ctx, driver, "/copied") if err != nil { t.Fatalf("unexpected error creating fileWriter: %v", err) } @@ -143,7 +145,7 @@ func TestSimpleWrite(t *testing.T) { t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) } - fr, err = newFileReader(driver, "/copied") + fr, err = newFileReader(ctx, driver, "/copied") if err != nil { t.Fatalf("unexpected error creating fileReader: %v", err) } @@ -162,7 +164,8 @@ func TestSimpleWrite(t *testing.T) { } func TestBufferedFileWriter(t *testing.T) { - writer, err := newFileWriter(inmemory.New(), "/random") + ctx := context.Background() + writer, err := newFileWriter(ctx, inmemory.New(), "/random") if err != nil { t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) @@ -203,8 +206,8 @@ func BenchmarkFileWriter(b *testing.B) { driver: inmemory.New(), path: "/random", } - - if fi, err := fw.driver.Stat(fw.path); err != nil { + ctx := context.Background() + if fi, err := fw.driver.Stat(ctx, fw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // ignore, offset is zero @@ -236,8 +239,9 @@ func BenchmarkFileWriter(b *testing.B) { func BenchmarkBufferedFileWriter(b *testing.B) { b.StopTimer() // not sure how long setup above will take + ctx := context.Background() for i := 0; i < b.N; i++ { - bfw, err := newFileWriter(inmemory.New(), "/random") + bfw, err := newFileWriter(ctx, inmemory.New(), "/random") if err != nil { b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) diff --git a/registry/storage/layer_test.go b/registry/storage/layer_test.go index f25018da..2ea99813 100644 --- a/registry/storage/layer_test.go +++ b/registry/storage/layer_test.go @@ -10,12 +10,12 @@ import ( "testing" "github.com/docker/distribution" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" - "golang.org/x/net/context" ) // TestSimpleLayerUpload covers the layer upload process, exercising common @@ -36,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) + registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -144,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) + registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -253,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) + registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -353,7 +353,8 @@ func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, digest: dgst, }) - if err := driver.PutContent(blobPath, p); err != nil { + ctx := context.Background() + if err := driver.PutContent(ctx, blobPath, p); err != nil { return "", err } @@ -370,7 +371,7 @@ func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, return "", err } - if err := driver.PutContent(layerLinkPath, []byte(dgst)); err != nil { + if err := driver.PutContent(ctx, layerLinkPath, []byte(dgst)); err != nil { return "", nil } diff --git a/registry/storage/layerreader.go b/registry/storage/layerreader.go index 40deba6a..ddca9741 100644 --- a/registry/storage/layerreader.go +++ b/registry/storage/layerreader.go @@ -54,7 +54,7 @@ func (lr *layerReader) Close() error { func (lr *layerReader) Handler(r *http.Request) (h http.Handler, err error) { var handlerFunc http.HandlerFunc - redirectURL, err := lr.fileReader.driver.URLFor(lr.path, map[string]interface{}{"method": r.Method}) + redirectURL, err := lr.fileReader.driver.URLFor(lr.ctx, lr.path, map[string]interface{}{"method": r.Method}) switch err { case nil: diff --git a/registry/storage/layerstore.go b/registry/storage/layerstore.go index a86b668f..8da14ac7 100644 --- a/registry/storage/layerstore.go +++ b/registry/storage/layerstore.go @@ -5,7 +5,7 @@ import ( "code.google.com/p/go-uuid/uuid" "github.com/docker/distribution" - ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" storagedriver "github.com/docker/distribution/registry/storage/driver" @@ -16,7 +16,7 @@ type layerStore struct { } func (ls *layerStore) Exists(digest digest.Digest) (bool, error) { - ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists") + context.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists") // Because this implementation just follows blob links, an existence check // is pretty cheap by starting and closing a fetch. @@ -35,13 +35,14 @@ func (ls *layerStore) Exists(digest digest.Digest) (bool, error) { } func (ls *layerStore) Fetch(dgst digest.Digest) (distribution.Layer, error) { - ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch") + ctx := ls.repository.ctx + context.GetLogger(ctx).Debug("(*layerStore).Fetch") bp, err := ls.path(dgst) if err != nil { return nil, err } - fr, err := newFileReader(ls.repository.driver, bp) + fr, err := newFileReader(ctx, ls.repository.driver, bp) if err != nil { return nil, err } @@ -56,7 +57,8 @@ func (ls *layerStore) Fetch(dgst digest.Digest) (distribution.Layer, error) { // is already in progress or the layer has already been uploaded, this // will return an error. func (ls *layerStore) Upload() (distribution.LayerUpload, error) { - ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload") + ctx := ls.repository.ctx + context.GetLogger(ctx).Debug("(*layerStore).Upload") // NOTE(stevvooe): Consider the issues with allowing concurrent upload of // the same two layers. Should it be disallowed? For now, we allow both @@ -84,7 +86,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) { } // Write a startedat file for this upload - if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + if err := ls.repository.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { return nil, err } @@ -94,7 +96,9 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) { // Resume continues an in progress layer upload, returning the current // state of the upload. func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) { - ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume") + ctx := ls.repository.ctx + context.GetLogger(ctx).Debug("(*layerStore).Resume") + startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{ name: ls.repository.Name(), uuid: uuid, @@ -104,7 +108,7 @@ func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) { return nil, err } - startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath) + startedAtBytes, err := ls.repository.driver.GetContent(ctx, startedAtPath) if err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: @@ -133,7 +137,7 @@ func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) { // newLayerUpload allocates a new upload controller with the given state. func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (distribution.LayerUpload, error) { - fw, err := newFileWriter(ls.repository.driver, path) + fw, err := newFileWriter(ls.repository.ctx, ls.repository.driver, path) if err != nil { return nil, err } diff --git a/registry/storage/layerwriter.go b/registry/storage/layerwriter.go index adf68ca9..a2672fe6 100644 --- a/registry/storage/layerwriter.go +++ b/registry/storage/layerwriter.go @@ -10,7 +10,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/distribution" - ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" storagedriver "github.com/docker/distribution/registry/storage/driver" ) @@ -47,7 +47,7 @@ func (lw *layerWriter) StartedAt() time.Time { // contents of the uploaded layer. The checksum should be provided in the // format :. func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) { - ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") + context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") if err := lw.bufferedFileWriter.Close(); err != nil { return nil, err @@ -67,7 +67,7 @@ func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) { break } - ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries). + context.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries). Errorf("error validating layer: %v", err) if retries < 3 { @@ -98,7 +98,7 @@ func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) { // Cancel the layer upload process. func (lw *layerWriter) Cancel() error { - ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel") + context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel") if err := lw.removeResources(); err != nil { return err } @@ -168,7 +168,7 @@ func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) { return nil, err } - paths, err := lw.driver.List(uploadHashStatePathPrefix) + paths, err := lw.driver.List(lw.layerStore.repository.ctx, uploadHashStatePathPrefix) if err != nil { if _, ok := err.(storagedriver.PathNotFoundError); !ok { return nil, err @@ -214,6 +214,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err) } + ctx := lw.layerStore.repository.ctx // Find the highest stored hashState with offset less than or equal to // the requested offset. for _, hashState := range hashStates { @@ -229,7 +230,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { // is probably okay to skip for now since we don't expect anyone to // use the API in this way. For that reason, we don't treat an // an error here as a fatal error, but only log it. - if err := lw.driver.Delete(hashState.path); err != nil { + if err := lw.driver.Delete(ctx, hashState.path); err != nil { logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err) } } @@ -239,7 +240,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { // No need to load any state, just reset the hasher. lw.resumableDigester.Reset() } else { - storedState, err := lw.driver.GetContent(hashStateMatch.path) + storedState, err := lw.driver.GetContent(ctx, hashStateMatch.path) if err != nil { return err } @@ -251,9 +252,8 @@ func (lw *layerWriter) resumeHashAt(offset int64) error { // Mind the gap. if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 { - // Need to read content from the upload to catch up to the desired - // offset. - fr, err := newFileReader(lw.driver, lw.path) + // Need to read content from the upload to catch up to the desired offset. + fr, err := newFileReader(ctx, lw.driver, lw.path) if err != nil { return err } @@ -286,7 +286,7 @@ func (lw *layerWriter) storeHashState() error { return err } - return lw.driver.PutContent(uploadHashStatePath, hashState) + return lw.driver.PutContent(lw.layerStore.repository.ctx, uploadHashStatePath, hashState) } // validateLayer checks the layer data against the digest, returning an error @@ -329,7 +329,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) } // Read the file from the backend driver and validate it. - fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path) + fr, err := newFileReader(lw.layerStore.repository.ctx, lw.bufferedFileWriter.driver, lw.path) if err != nil { return "", err } @@ -345,7 +345,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) } if !verified { - ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst). + context.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst). Errorf("canonical digest does match provided digest") return "", distribution.ErrLayerInvalidDigest{ Digest: dgst, @@ -368,8 +368,9 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error { return err } + ctx := lw.layerStore.repository.ctx // Check for existence - if _, err := lw.driver.Stat(blobPath); err != nil { + if _, err := lw.driver.Stat(ctx, blobPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. @@ -388,7 +389,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error { // the size here and write a zero-length file to blobPath if this is the // case. For the most part, this should only ever happen with zero-length // tars. - if _, err := lw.driver.Stat(lw.path); err != nil { + if _, err := lw.driver.Stat(ctx, lw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // HACK(stevvooe): This is slightly dangerous: if we verify above, @@ -397,7 +398,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error { // prevent this horrid thing, we employ the hack of only allowing // to this happen for the zero tarsum. if dgst == digest.DigestSha256EmptyTar { - return lw.driver.PutContent(blobPath, []byte{}) + return lw.driver.PutContent(ctx, blobPath, []byte{}) } // We let this fail during the move below. @@ -409,7 +410,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error { } } - return lw.driver.Move(lw.path, blobPath) + return lw.driver.Move(ctx, lw.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the @@ -435,7 +436,8 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige return err } - if err := lw.layerStore.repository.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { + ctx := lw.layerStore.repository.ctx + if err := lw.layerStore.repository.driver.PutContent(ctx, layerLinkPath, []byte(canonical)); err != nil { return err } } @@ -459,8 +461,7 @@ func (lw *layerWriter) removeResources() error { // Resolve and delete the containing directory, which should include any // upload related files. dirPath := path.Dir(dataPath) - - if err := lw.driver.Delete(dirPath); err != nil { + if err := lw.driver.Delete(lw.layerStore.repository.ctx, dirPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // already gone! diff --git a/registry/storage/manifeststore_test.go b/registry/storage/manifeststore_test.go index a70789d3..3bafb997 100644 --- a/registry/storage/manifeststore_test.go +++ b/registry/storage/manifeststore_test.go @@ -30,7 +30,7 @@ type manifestStoreTestEnv struct { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { ctx := context.Background() driver := inmemory.New() - registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) + registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()) repo, err := registry.Repository(ctx, name) if err != nil { diff --git a/registry/storage/purgeuploads.go b/registry/storage/purgeuploads.go index 13c468de..cf723070 100644 --- a/registry/storage/purgeuploads.go +++ b/registry/storage/purgeuploads.go @@ -7,6 +7,7 @@ import ( "code.google.com/p/go-uuid/uuid" log "github.com/Sirupsen/logrus" + "github.com/docker/distribution/context" storageDriver "github.com/docker/distribution/registry/storage/driver" ) @@ -28,9 +29,9 @@ func newUploadData() uploadData { // PurgeUploads deletes files from the upload directory // created before olderThan. The list of files deleted and errors // encountered are returned -func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) { +func PurgeUploads(ctx context.Context, driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) { log.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete) - uploadData, errors := getOutstandingUploads(driver) + uploadData, errors := getOutstandingUploads(ctx, driver) var deleted []string for _, uploadData := range uploadData { if uploadData.startedAt.Before(olderThan) { @@ -38,7 +39,7 @@ func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actua log.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.", uploadData.containingDir, uploadData.startedAt, olderThan) if actuallyDelete { - err = driver.Delete(uploadData.containingDir) + err = driver.Delete(ctx, uploadData.containingDir) } if err == nil { deleted = append(deleted, uploadData.containingDir) @@ -56,7 +57,7 @@ func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actua // which could be eligible for deletion. The only reliable way to // classify the age of a file is with the date stored in the startedAt // file, so gather files by UUID with a date from startedAt. -func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploadData, []error) { +func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriver) (map[string]uploadData, []error) { var errors []error uploads := make(map[string]uploadData, 0) @@ -65,7 +66,7 @@ func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploa if err != nil { return uploads, append(errors, err) } - err = Walk(driver, root, func(fileInfo storageDriver.FileInfo) error { + err = Walk(ctx, driver, root, func(fileInfo storageDriver.FileInfo) error { filePath := fileInfo.Path() _, file := path.Split(filePath) if file[0] == '_' { @@ -124,7 +125,8 @@ func uUIDFromPath(path string) (string, bool) { // readStartedAtFile reads the date from an upload's startedAtFile func readStartedAtFile(driver storageDriver.StorageDriver, path string) (time.Time, error) { - startedAtBytes, err := driver.GetContent(path) + // todo:(richardscothern) - pass in a context + startedAtBytes, err := driver.GetContent(context.Background(), path) if err != nil { return time.Now(), err } diff --git a/registry/storage/purgeuploads_test.go b/registry/storage/purgeuploads_test.go index 368e7c86..7c0f8813 100644 --- a/registry/storage/purgeuploads_test.go +++ b/registry/storage/purgeuploads_test.go @@ -7,26 +7,28 @@ import ( "time" "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/context" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" ) var pm = defaultPathMapper -func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) driver.StorageDriver { +func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) (driver.StorageDriver, context.Context) { d := inmemory.New() + ctx := context.Background() for i := 0; i < numUploads; i++ { - addUploads(t, d, uuid.New(), repoName, startedAt) + addUploads(ctx, t, d, uuid.New(), repoName, startedAt) } - return d + return d, ctx } -func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) { +func addUploads(ctx context.Context, t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) { dataPath, err := pm.path(uploadDataPathSpec{name: repo, uuid: uploadID}) if err != nil { t.Fatalf("Unable to resolve path") } - if err := d.PutContent(dataPath, []byte("")); err != nil { + if err := d.PutContent(ctx, dataPath, []byte("")); err != nil { t.Fatalf("Unable to write data file") } @@ -35,7 +37,7 @@ func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, sta t.Fatalf("Unable to resolve path") } - if d.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + if d.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { t.Fatalf("Unable to write startedAt file") } @@ -43,8 +45,8 @@ func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, sta func TestPurgeGather(t *testing.T) { uploadCount := 5 - fs := testUploadFS(t, uploadCount, "test-repo", time.Now()) - uploadData, errs := getOutstandingUploads(fs) + fs, ctx := testUploadFS(t, uploadCount, "test-repo", time.Now()) + uploadData, errs := getOutstandingUploads(ctx, fs) if len(errs) != 0 { t.Errorf("Unexepected errors: %q", errs) } @@ -54,9 +56,9 @@ func TestPurgeGather(t *testing.T) { } func TestPurgeNone(t *testing.T) { - fs := testUploadFS(t, 10, "test-repo", time.Now()) + fs, ctx := testUploadFS(t, 10, "test-repo", time.Now()) oneHourAgo := time.Now().Add(-1 * time.Hour) - deleted, errs := PurgeUploads(fs, oneHourAgo, true) + deleted, errs := PurgeUploads(ctx, fs, oneHourAgo, true) if len(errs) != 0 { t.Error("Unexpected errors", errs) } @@ -68,13 +70,13 @@ func TestPurgeNone(t *testing.T) { func TestPurgeAll(t *testing.T) { uploadCount := 10 oneHourAgo := time.Now().Add(-1 * time.Hour) - fs := testUploadFS(t, uploadCount, "test-repo", oneHourAgo) + fs, ctx := testUploadFS(t, uploadCount, "test-repo", oneHourAgo) // Ensure > 1 repos are purged - addUploads(t, fs, uuid.New(), "test-repo2", oneHourAgo) + addUploads(ctx, t, fs, uuid.New(), "test-repo2", oneHourAgo) uploadCount++ - deleted, errs := PurgeUploads(fs, time.Now(), true) + deleted, errs := PurgeUploads(ctx, fs, time.Now(), true) if len(errs) != 0 { t.Error("Unexpected errors:", errs) } @@ -88,15 +90,15 @@ func TestPurgeAll(t *testing.T) { func TestPurgeSome(t *testing.T) { oldUploadCount := 5 oneHourAgo := time.Now().Add(-1 * time.Hour) - fs := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo) + fs, ctx := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo) newUploadCount := 4 for i := 0; i < newUploadCount; i++ { - addUploads(t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour)) + addUploads(ctx, t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour)) } - deleted, errs := PurgeUploads(fs, time.Now(), true) + deleted, errs := PurgeUploads(ctx, fs, time.Now(), true) if len(errs) != 0 { t.Error("Unexpected errors:", errs) } @@ -109,7 +111,7 @@ func TestPurgeSome(t *testing.T) { func TestPurgeOnlyUploads(t *testing.T) { oldUploadCount := 5 oneHourAgo := time.Now().Add(-1 * time.Hour) - fs := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo) + fs, ctx := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo) // Create a directory tree outside _uploads and ensure // these files aren't deleted. @@ -123,11 +125,11 @@ func TestPurgeOnlyUploads(t *testing.T) { } nonUploadFile := path.Join(nonUploadPath, "file") - if err = fs.PutContent(nonUploadFile, []byte("")); err != nil { + if err = fs.PutContent(ctx, nonUploadFile, []byte("")); err != nil { t.Fatalf("Unable to write data file") } - deleted, errs := PurgeUploads(fs, time.Now(), true) + deleted, errs := PurgeUploads(ctx, fs, time.Now(), true) if len(errs) != 0 { t.Error("Unexpected errors", errs) } @@ -140,13 +142,14 @@ func TestPurgeOnlyUploads(t *testing.T) { func TestPurgeMissingStartedAt(t *testing.T) { oneHourAgo := time.Now().Add(-1 * time.Hour) - fs := testUploadFS(t, 1, "test-repo", oneHourAgo) - err := Walk(fs, "/", func(fileInfo driver.FileInfo) error { + fs, ctx := testUploadFS(t, 1, "test-repo", oneHourAgo) + + err := Walk(ctx, fs, "/", func(fileInfo driver.FileInfo) error { filePath := fileInfo.Path() _, file := path.Split(filePath) if file == "startedat" { - if err := fs.Delete(filePath); err != nil { + if err := fs.Delete(ctx, filePath); err != nil { t.Fatalf("Unable to delete startedat file: %s", filePath) } } @@ -155,7 +158,7 @@ func TestPurgeMissingStartedAt(t *testing.T) { if err != nil { t.Fatalf("Unexpected error during Walk: %s ", err.Error()) } - deleted, errs := PurgeUploads(fs, time.Now(), true) + deleted, errs := PurgeUploads(ctx, fs, time.Now(), true) if len(errs) > 0 { t.Errorf("Unexpected errors") } diff --git a/registry/storage/registry.go b/registry/storage/registry.go index 1126db45..2834e5eb 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -20,10 +20,11 @@ type registry struct { // NewRegistryWithDriver creates a new registry instance from the provided // driver. The resulting registry may be shared by multiple goroutines but is // cheap to allocate. -func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Namespace { +func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Namespace { bs := &blobStore{ driver: driver, pm: defaultPathMapper, + ctx: ctx, } return ®istry{ diff --git a/registry/storage/revisionstore.go b/registry/storage/revisionstore.go index ac605360..066ce972 100644 --- a/registry/storage/revisionstore.go +++ b/registry/storage/revisionstore.go @@ -26,7 +26,7 @@ func (rs *revisionStore) exists(revision digest.Digest) (bool, error) { return false, err } - exists, err := exists(rs.driver, revpath) + exists, err := exists(rs.repository.ctx, rs.driver, revpath) if err != nil { return false, err } @@ -121,7 +121,7 @@ func (rs *revisionStore) link(revision digest.Digest) error { return err } - if exists, err := exists(rs.driver, revisionPath); err != nil { + if exists, err := exists(rs.repository.ctx, rs.driver, revisionPath); err != nil { return err } else if exists { // Revision has already been linked! @@ -142,5 +142,5 @@ func (rs *revisionStore) delete(revision digest.Digest) error { return err } - return rs.driver.Delete(revisionPath) + return rs.driver.Delete(rs.repository.ctx, revisionPath) } diff --git a/registry/storage/signaturestore.go b/registry/storage/signaturestore.go index 7094b69e..fcf6224f 100644 --- a/registry/storage/signaturestore.go +++ b/registry/storage/signaturestore.go @@ -30,7 +30,7 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) { // can be eliminated by implementing listAll on drivers. signaturesPath = path.Join(signaturesPath, "sha256") - signaturePaths, err := s.driver.List(signaturesPath) + signaturePaths, err := s.driver.List(s.repository.ctx, signaturesPath) if err != nil { return nil, err } diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 616df952..882e6c35 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -4,6 +4,7 @@ import ( "path" "github.com/docker/distribution" + // "github.com/docker/distribution/context" "github.com/docker/distribution/digest" storagedriver "github.com/docker/distribution/registry/storage/driver" ) @@ -23,7 +24,7 @@ func (ts *tagStore) tags() ([]string, error) { } var tags []string - entries, err := ts.driver.List(p) + entries, err := ts.driver.List(ts.repository.ctx, p) if err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: @@ -52,7 +53,7 @@ func (ts *tagStore) exists(tag string) (bool, error) { return false, err } - exists, err := exists(ts.driver, tagPath) + exists, err := exists(ts.repository.ctx, ts.driver, tagPath) if err != nil { return false, err } @@ -102,7 +103,7 @@ func (ts *tagStore) resolve(tag string) (digest.Digest, error) { return "", err } - if exists, err := exists(ts.driver, currentPath); err != nil { + if exists, err := exists(ts.repository.ctx, ts.driver, currentPath); err != nil { return "", err } else if !exists { return "", distribution.ErrManifestUnknown{Name: ts.Name(), Tag: tag} @@ -130,7 +131,7 @@ func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) { // TODO(stevvooe): Need to append digest alg to get listing of revisions. manifestTagIndexPath = path.Join(manifestTagIndexPath, "sha256") - entries, err := ts.driver.List(manifestTagIndexPath) + entries, err := ts.driver.List(ts.repository.ctx, manifestTagIndexPath) if err != nil { return nil, err } @@ -154,5 +155,5 @@ func (ts *tagStore) delete(tag string) error { return err } - return ts.driver.Delete(tagPath) + return ts.driver.Delete(ts.repository.ctx, tagPath) } diff --git a/registry/storage/walk.go b/registry/storage/walk.go index 7b958d87..8290f167 100644 --- a/registry/storage/walk.go +++ b/registry/storage/walk.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/docker/distribution/context" storageDriver "github.com/docker/distribution/registry/storage/driver" ) @@ -20,13 +21,13 @@ type WalkFn func(fileInfo storageDriver.FileInfo) error // Walk traverses a filesystem defined within driver, starting // from the given path, calling f on each file -func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error { - children, err := driver.List(from) +func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f WalkFn) error { + children, err := driver.List(ctx, from) if err != nil { return err } for _, child := range children { - fileInfo, err := driver.Stat(child) + fileInfo, err := driver.Stat(ctx, child) if err != nil { return err } @@ -37,7 +38,7 @@ func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error { } if fileInfo.IsDir() && !skipDir { - Walk(driver, child, f) + Walk(ctx, driver, child, f) } } return nil diff --git a/registry/storage/walk_test.go b/registry/storage/walk_test.go index 22b91b35..40b8547c 100644 --- a/registry/storage/walk_test.go +++ b/registry/storage/walk_test.go @@ -4,17 +4,19 @@ import ( "fmt" "testing" + "github.com/docker/distribution/context" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" ) -func testFS(t *testing.T) (driver.StorageDriver, map[string]string) { +func testFS(t *testing.T) (driver.StorageDriver, map[string]string, context.Context) { d := inmemory.New() c := []byte("") - if err := d.PutContent("/a/b/c/d", c); err != nil { + ctx := context.Background() + if err := d.PutContent(ctx, "/a/b/c/d", c); err != nil { t.Fatalf("Unable to put to inmemory fs") } - if err := d.PutContent("/a/b/c/e", c); err != nil { + if err := d.PutContent(ctx, "/a/b/c/e", c); err != nil { t.Fatalf("Unable to put to inmemory fs") } @@ -26,20 +28,20 @@ func testFS(t *testing.T) (driver.StorageDriver, map[string]string) { "/a/b/c/e": "file", } - return d, expected + return d, expected, ctx } func TestWalkErrors(t *testing.T) { - d, expected := testFS(t) + d, expected, ctx := testFS(t) fileCount := len(expected) - err := Walk(d, "", func(fileInfo driver.FileInfo) error { + err := Walk(ctx, d, "", func(fileInfo driver.FileInfo) error { return nil }) if err == nil { t.Error("Expected invalid root err") } - err = Walk(d, "/", func(fileInfo driver.FileInfo) error { + err = Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error { // error on the 2nd file if fileInfo.Path() == "/a/b" { return fmt.Errorf("Early termination") @@ -54,7 +56,7 @@ func TestWalkErrors(t *testing.T) { t.Error(err.Error()) } - err = Walk(d, "/nonexistant", func(fileInfo driver.FileInfo) error { + err = Walk(ctx, d, "/nonexistant", func(fileInfo driver.FileInfo) error { return nil }) if err == nil { @@ -64,8 +66,8 @@ func TestWalkErrors(t *testing.T) { } func TestWalk(t *testing.T) { - d, expected := testFS(t) - err := Walk(d, "/", func(fileInfo driver.FileInfo) error { + d, expected, ctx := testFS(t) + err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error { filePath := fileInfo.Path() filetype, ok := expected[filePath] if !ok { @@ -93,8 +95,8 @@ func TestWalk(t *testing.T) { } func TestWalkSkipDir(t *testing.T) { - d, expected := testFS(t) - err := Walk(d, "/", func(fileInfo driver.FileInfo) error { + d, expected, ctx := testFS(t) + err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error { filePath := fileInfo.Path() if filePath == "/a/b" { // skip processing /a/b/c and /a/b/c/d