From 44b14ceadc463be6edf9dec42527febcbbe3743a Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 1 Apr 2015 16:41:33 -0700 Subject: [PATCH] Integrate layer info cache with registry and storage This changeset integrates the layer info cache with the registry webapp and storage backend. The main benefit is to cache immutable layer meta data, reducing backend roundtrips. The cache can be configured to use either redis or an inmemory cache. This provides massive performance benefits for HEAD http checks on layer blobs and manifest verification. Signed-off-by: Stephen J Day --- notifications/listener_test.go | 3 +- registry.go | 4 + registry/doc.go | 2 +- registry/handlers/app.go | 9 +- registry/handlers/app_test.go | 3 +- registry/storage/blobstore.go | 5 +- registry/storage/filereader.go | 4 +- registry/storage/layer_test.go | 9 +- registry/storage/layercache.go | 183 +++++++++++++++++++++++++ registry/storage/layerreader.go | 15 ++ registry/storage/manifeststore_test.go | 4 +- registry/storage/registry.go | 46 +++++-- 12 files changed, 262 insertions(+), 25 deletions(-) create mode 100644 registry/storage/layercache.go diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 18633bc67..956279a23 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -9,6 +9,7 @@ import ( "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" "github.com/docker/libtrust" @@ -16,7 +17,7 @@ import ( ) func TestListener(t *testing.T) { - registry := storage.NewRegistryWithDriver(inmemory.New()) + registry := storage.NewRegistryWithDriver(inmemory.New(), cache.NewInMemoryLayerInfoCache()) tl := &testListener{ ops: make(map[string]int), } diff --git a/registry.go b/registry.go index c12a6ad6c..52b4f8d35 100644 --- a/registry.go +++ b/registry.go @@ -33,6 +33,10 @@ type Repository interface { Signatures() SignatureService } +// TODO(stevvooe): Must add close methods to all these. May want to change the +// way instances are created to better reflect internal dependency +// relationships. + // ManifestService provides operations on image manifests. type ManifestService interface { // Exists returns true if the manifest exists. diff --git a/registry/doc.go b/registry/doc.go index 5049dae35..1c01e42ea 100644 --- a/registry/doc.go +++ b/registry/doc.go @@ -1,3 +1,3 @@ // Package registry is a placeholder package for registry interface -// destinations and utilities. +// definitions and utilities. package registry diff --git a/registry/handlers/app.go b/registry/handlers/app.go index f837e8618..e333d6d9a 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -18,6 +18,7 @@ import ( registrymiddleware "github.com/docker/distribution/registry/middleware/registry" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/factory" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" @@ -102,7 +103,13 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App app.configureEvents(&configuration) app.configureRedis(&configuration) - app.registry = storage.NewRegistryWithDriver(app.driver) + if app.redis != nil { + app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis)) + } else { + // always fall back to inmemory storage + app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache()) + } + app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) if err != nil { panic(err) diff --git a/registry/handlers/app_test.go b/registry/handlers/app_test.go index cd515dd0c..d0b9174d4 100644 --- a/registry/handlers/app_test.go +++ b/registry/handlers/app_test.go @@ -13,6 +13,7 @@ import ( "github.com/docker/distribution/registry/auth" _ "github.com/docker/distribution/registry/auth/silly" "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/driver/inmemory" "golang.org/x/net/context" ) @@ -28,7 +29,7 @@ func TestAppDispatcher(t *testing.T) { Context: context.Background(), router: v2.Router(), driver: driver, - registry: storage.NewRegistryWithDriver(driver), + registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()), } server := httptest.NewServer(app) router := v2.Router() diff --git a/registry/storage/blobstore.go b/registry/storage/blobstore.go index 975df19f9..8bab2f5e1 100644 --- a/registry/storage/blobstore.go +++ b/registry/storage/blobstore.go @@ -18,8 +18,9 @@ import ( // abstraction, providing utility methods that support creating and traversing // backend links. type blobStore struct { - *registry - ctx context.Context + driver storagedriver.StorageDriver + pm *pathMapper + ctx context.Context } // exists reports whether or not the path exists. If the driver returns error diff --git a/registry/storage/filereader.go b/registry/storage/filereader.go index b70b1fb20..65d4347fa 100644 --- a/registry/storage/filereader.go +++ b/registry/storage/filereader.go @@ -27,8 +27,8 @@ type fileReader struct { // identifying fields path string - size int64 // size is the total layer size, must be set. - modtime time.Time + size int64 // size is the total size, must be set. + modtime time.Time // TODO(stevvooe): This is not needed anymore. // mutable fields rc io.ReadCloser // remote read closer diff --git a/registry/storage/layer_test.go b/registry/storage/layer_test.go index 43e028d56..e225d0685 100644 --- a/registry/storage/layer_test.go +++ b/registry/storage/layer_test.go @@ -11,6 +11,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" @@ -35,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -143,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -180,7 +181,7 @@ func TestSimpleLayerRead(t *testing.T) { t.Fatalf("unexpected error fetching non-existent layer: %v", err) } - randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader) + randomLayerDigest, err := writeTestLayer(driver, defaultPathMapper, imageName, dgst, randomLayerReader) if err != nil { t.Fatalf("unexpected error writing test layer: %v", err) } @@ -252,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/registry/storage/layercache.go b/registry/storage/layercache.go new file mode 100644 index 000000000..c7ee9b27a --- /dev/null +++ b/registry/storage/layercache.go @@ -0,0 +1,183 @@ +package storage + +import ( + "expvar" + "sync/atomic" + "time" + + "github.com/docker/distribution" + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/driver" + "golang.org/x/net/context" +) + +// cachedLayerService implements the layer service with path-aware caching, +// using a LayerInfoCache interface. +type cachedLayerService struct { + distribution.LayerService // upstream layer service + repository distribution.Repository + ctx context.Context + driver driver.StorageDriver + *blobStore // global blob store + cache cache.LayerInfoCache +} + +// Exists checks for existence of the digest in the cache, immediately +// returning if it exists for the repository. If not, the upstream is checked. +// When a positive result is found, it is written into the cache. +func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) { + ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst) + now := time.Now() + defer func() { + // TODO(stevvooe): Replace this with a decent context-based metrics solution + ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)). + Infof("(*cachedLayerService).Exists(%q)", dgst) + }() + + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1) + available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + if available { + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1) + return true, nil + } + +fallback: + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1) + exists, err := lc.LayerService.Exists(dgst) + if err != nil { + return exists, err + } + + if exists { + // we can only cache this if the existence is positive. + if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err) + } + } + + return exists, err +} + +// Fetch checks for the availability of the layer in the repository via the +// cache. If present, the metadata is resolved and the layer is returned. If +// any operation fails, the layer is read directly from the upstream. The +// results are cached, if possible. +func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) { + ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst) + now := time.Now() + defer func() { + ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)). + Infof("(*layerInfoCache).Fetch(%q)", dgst) + }() + + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1) + available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + if available { + // fast path: get the layer info and return + meta, err := lc.cache.Meta(lc.ctx, dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1) + return newLayerReader(lc.driver, dgst, meta.Path, meta.Length) + } + + // NOTE(stevvooe): Unfortunately, the cache here only makes checks for + // existing layers faster. We'd have to provide more careful + // synchronization with the backend to make the missing case as fast. + +fallback: + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1) + layer, err := lc.LayerService.Fetch(dgst) + if err != nil { + return nil, err + } + + // add the layer to the repository + if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil { + ctxu.GetLogger(lc.ctx). + Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err) + } + + // lookup layer path and add it to the cache, if it succeds. Note that we + // still return the layer even if we have trouble caching it. + if path, err := lc.resolveLayerPath(layer); err != nil { + ctxu.GetLogger(lc.ctx). + Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err) + } else { + // add the layer to the cache once we've resolved the path. + if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err) + } + } + + return layer, err +} + +// extractLayerInfo pulls the layerInfo from the layer, attempting to get the +// path information from either the concrete object or by resolving the +// primary blob store path. +func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) { + // try and resolve the type and driver, so we don't have to traverse links + switch v := layer.(type) { + case *layerReader: + // only set path if we have same driver instance. + if v.driver == lc.driver { + return v.path, nil + } + } + + ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest()) + // we have to do an expensive stat to resolve the layer location but no + // need to check the link, since we already have layer instance for this + // repository. + bp, err := lc.blobStore.path(layer.Digest()) + if err != nil { + return "", err + } + + return bp, nil +} + +// layerInfoCacheMetrics keeps track of cache metrics for layer info cache +// requests. Note this is kept globally and made available via expvar. For +// more detailed metrics, its recommend to instrument a particular cache +// implementation. +var layerInfoCacheMetrics struct { + // Exists tracks calls to the Exists caches. + Exists struct { + Requests uint64 + Hits uint64 + Misses uint64 + } + + // Fetch tracks calls to the fetch caches. + Fetch struct { + Requests uint64 + Hits uint64 + Misses uint64 + } +} + +func init() { + expvar.Publish("layerinfocache", expvar.Func(func() interface{} { + // no need for synchronous access: the increments are atomic and + // during reading, we don't care if the data is up to date. The + // numbers will always *eventually* be reported correctly. + return layerInfoCacheMetrics + })) +} diff --git a/registry/storage/layerreader.go b/registry/storage/layerreader.go index 414951d9a..40deba6a7 100644 --- a/registry/storage/layerreader.go +++ b/registry/storage/layerreader.go @@ -17,6 +17,21 @@ type layerReader struct { digest digest.Digest } +// newLayerReader returns a new layerReader with the digest, path and length, +// eliding round trips to the storage backend. +func newLayerReader(driver driver.StorageDriver, dgst digest.Digest, path string, length int64) (*layerReader, error) { + fr := &fileReader{ + driver: driver, + path: path, + size: length, + } + + return &layerReader{ + fileReader: *fr, + digest: dgst, + }, nil +} + var _ distribution.Layer = &layerReader{} func (lr *layerReader) Digest() digest.Digest { diff --git a/registry/storage/manifeststore_test.go b/registry/storage/manifeststore_test.go index dc03dcedd..fe75868b7 100644 --- a/registry/storage/manifeststore_test.go +++ b/registry/storage/manifeststore_test.go @@ -6,6 +6,8 @@ import ( "reflect" "testing" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" @@ -28,7 +30,7 @@ type manifestStoreTestEnv struct { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { ctx := context.Background() driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repo, err := registry.Repository(ctx, name) if err != nil { diff --git a/registry/storage/registry.go b/registry/storage/registry.go index 8d7ea16ec..9ad43acb7 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -3,6 +3,7 @@ package storage import ( "github.com/docker/distribution" "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "golang.org/x/net/context" ) @@ -10,28 +11,29 @@ import ( // registry is the top-level implementation of Registry for use in the storage // package. All instances should descend from this object. type registry struct { - driver storagedriver.StorageDriver - pm *pathMapper - blobStore *blobStore + driver storagedriver.StorageDriver + pm *pathMapper + blobStore *blobStore + layerInfoCache cache.LayerInfoCache } // NewRegistryWithDriver creates a new registry instance from the provided // driver. The resulting registry may be shared by multiple goroutines but is // cheap to allocate. -func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry { - bs := &blobStore{} +func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Registry { + bs := &blobStore{ + driver: driver, + pm: defaultPathMapper, + } - reg := ®istry{ + return ®istry{ driver: driver, blobStore: bs, // TODO(sday): This should be configurable. - pm: defaultPathMapper, + pm: defaultPathMapper, + layerInfoCache: layerInfoCache, } - - reg.blobStore.registry = reg - - return reg } // Repository returns an instance of the repository tied to the registry. @@ -83,9 +85,29 @@ func (repo *repository) Manifests() distribution.ManifestService { // may be context sensitive in the future. The instance should be used similar // to a request local. func (repo *repository) Layers() distribution.LayerService { - return &layerStore{ + ls := &layerStore{ repository: repo, } + + if repo.registry.layerInfoCache != nil { + // TODO(stevvooe): This is not the best place to setup a cache. We would + // really like to decouple the cache from the backend but also have the + // manifeset service use the layer service cache. For now, we can simply + // integrate the cache directly. The main issue is that we have layer + // access and layer data coupled in a single object. Work is already under + // way to decouple this. + + return &cachedLayerService{ + LayerService: ls, + repository: repo, + ctx: repo.ctx, + driver: repo.driver, + blobStore: repo.blobStore, + cache: repo.registry.layerInfoCache, + } + } + + return ls } func (repo *repository) Signatures() distribution.SignatureService {