Integrate layer info cache with registry and storage

This changeset integrates the layer info cache with the registry webapp and
storage backend. The main benefit is to cache immutable layer meta data,
reducing backend roundtrips. The cache can be configured to use either redis or
an inmemory cache.

This provides massive performance benefits for HEAD http checks on layer blobs
and manifest verification.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-04-01 16:41:33 -07:00
parent b1f616cbff
commit 44b14ceadc
12 changed files with 262 additions and 25 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
@ -16,7 +17,7 @@ import (
)
func TestListener(t *testing.T) {
registry := storage.NewRegistryWithDriver(inmemory.New())
registry := storage.NewRegistryWithDriver(inmemory.New(), cache.NewInMemoryLayerInfoCache())
tl := &testListener{
ops: make(map[string]int),
}

View file

@ -33,6 +33,10 @@ type Repository interface {
Signatures() SignatureService
}
// TODO(stevvooe): Must add close methods to all these. May want to change the
// way instances are created to better reflect internal dependency
// relationships.
// ManifestService provides operations on image manifests.
type ManifestService interface {
// Exists returns true if the manifest exists.

View file

@ -1,3 +1,3 @@
// Package registry is a placeholder package for registry interface
// destinations and utilities.
// definitions and utilities.
package registry

View file

@ -18,6 +18,7 @@ import (
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
@ -102,7 +103,13 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
app.configureEvents(&configuration)
app.configureRedis(&configuration)
app.registry = storage.NewRegistryWithDriver(app.driver)
if app.redis != nil {
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
} else {
// always fall back to inmemory storage
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
}
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
if err != nil {
panic(err)

View file

@ -13,6 +13,7 @@ import (
"github.com/docker/distribution/registry/auth"
_ "github.com/docker/distribution/registry/auth/silly"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"golang.org/x/net/context"
)
@ -28,7 +29,7 @@ func TestAppDispatcher(t *testing.T) {
Context: context.Background(),
router: v2.Router(),
driver: driver,
registry: storage.NewRegistryWithDriver(driver),
registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()),
}
server := httptest.NewServer(app)
router := v2.Router()

View file

@ -18,8 +18,9 @@ import (
// abstraction, providing utility methods that support creating and traversing
// backend links.
type blobStore struct {
*registry
ctx context.Context
driver storagedriver.StorageDriver
pm *pathMapper
ctx context.Context
}
// exists reports whether or not the path exists. If the driver returns error

View file

@ -27,8 +27,8 @@ type fileReader struct {
// identifying fields
path string
size int64 // size is the total layer size, must be set.
modtime time.Time
size int64 // size is the total size, must be set.
modtime time.Time // TODO(stevvooe): This is not needed anymore.
// mutable fields
rc io.ReadCloser // remote read closer

View file

@ -11,6 +11,7 @@ import (
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
@ -35,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -143,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -180,7 +181,7 @@ func TestSimpleLayerRead(t *testing.T) {
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
}
randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
randomLayerDigest, err := writeTestLayer(driver, defaultPathMapper, imageName, dgst, randomLayerReader)
if err != nil {
t.Fatalf("unexpected error writing test layer: %v", err)
}
@ -252,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)

View file

@ -0,0 +1,183 @@
package storage
import (
"expvar"
"sync/atomic"
"time"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution/registry/storage/driver"
"golang.org/x/net/context"
)
// cachedLayerService implements the layer service with path-aware caching,
// using a LayerInfoCache interface.
type cachedLayerService struct {
distribution.LayerService // upstream layer service
repository distribution.Repository
ctx context.Context
driver driver.StorageDriver
*blobStore // global blob store
cache cache.LayerInfoCache
}
// Exists checks for existence of the digest in the cache, immediately
// returning if it exists for the repository. If not, the upstream is checked.
// When a positive result is found, it is written into the cache.
func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) {
ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst)
now := time.Now()
defer func() {
// TODO(stevvooe): Replace this with a decent context-based metrics solution
ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)).
Infof("(*cachedLayerService).Exists(%q)", dgst)
}()
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1)
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
if err != nil {
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
goto fallback
}
if available {
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1)
return true, nil
}
fallback:
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1)
exists, err := lc.LayerService.Exists(dgst)
if err != nil {
return exists, err
}
if exists {
// we can only cache this if the existence is positive.
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err)
}
}
return exists, err
}
// Fetch checks for the availability of the layer in the repository via the
// cache. If present, the metadata is resolved and the layer is returned. If
// any operation fails, the layer is read directly from the upstream. The
// results are cached, if possible.
func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst)
now := time.Now()
defer func() {
ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)).
Infof("(*layerInfoCache).Fetch(%q)", dgst)
}()
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1)
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
if err != nil {
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
goto fallback
}
if available {
// fast path: get the layer info and return
meta, err := lc.cache.Meta(lc.ctx, dgst)
if err != nil {
ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err)
goto fallback
}
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1)
return newLayerReader(lc.driver, dgst, meta.Path, meta.Length)
}
// NOTE(stevvooe): Unfortunately, the cache here only makes checks for
// existing layers faster. We'd have to provide more careful
// synchronization with the backend to make the missing case as fast.
fallback:
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1)
layer, err := lc.LayerService.Fetch(dgst)
if err != nil {
return nil, err
}
// add the layer to the repository
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
ctxu.GetLogger(lc.ctx).
Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err)
}
// lookup layer path and add it to the cache, if it succeds. Note that we
// still return the layer even if we have trouble caching it.
if path, err := lc.resolveLayerPath(layer); err != nil {
ctxu.GetLogger(lc.ctx).
Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err)
} else {
// add the layer to the cache once we've resolved the path.
if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil {
ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err)
}
}
return layer, err
}
// extractLayerInfo pulls the layerInfo from the layer, attempting to get the
// path information from either the concrete object or by resolving the
// primary blob store path.
func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) {
// try and resolve the type and driver, so we don't have to traverse links
switch v := layer.(type) {
case *layerReader:
// only set path if we have same driver instance.
if v.driver == lc.driver {
return v.path, nil
}
}
ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest())
// we have to do an expensive stat to resolve the layer location but no
// need to check the link, since we already have layer instance for this
// repository.
bp, err := lc.blobStore.path(layer.Digest())
if err != nil {
return "", err
}
return bp, nil
}
// layerInfoCacheMetrics keeps track of cache metrics for layer info cache
// requests. Note this is kept globally and made available via expvar. For
// more detailed metrics, its recommend to instrument a particular cache
// implementation.
var layerInfoCacheMetrics struct {
// Exists tracks calls to the Exists caches.
Exists struct {
Requests uint64
Hits uint64
Misses uint64
}
// Fetch tracks calls to the fetch caches.
Fetch struct {
Requests uint64
Hits uint64
Misses uint64
}
}
func init() {
expvar.Publish("layerinfocache", expvar.Func(func() interface{} {
// no need for synchronous access: the increments are atomic and
// during reading, we don't care if the data is up to date. The
// numbers will always *eventually* be reported correctly.
return layerInfoCacheMetrics
}))
}

View file

@ -17,6 +17,21 @@ type layerReader struct {
digest digest.Digest
}
// newLayerReader returns a new layerReader with the digest, path and length,
// eliding round trips to the storage backend.
func newLayerReader(driver driver.StorageDriver, dgst digest.Digest, path string, length int64) (*layerReader, error) {
fr := &fileReader{
driver: driver,
path: path,
size: length,
}
return &layerReader{
fileReader: *fr,
digest: dgst,
}, nil
}
var _ distribution.Layer = &layerReader{}
func (lr *layerReader) Digest() digest.Digest {

View file

@ -6,6 +6,8 @@ import (
"reflect"
"testing"
"github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
@ -28,7 +30,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
repo, err := registry.Repository(ctx, name)
if err != nil {

View file

@ -3,6 +3,7 @@ package storage
import (
"github.com/docker/distribution"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"golang.org/x/net/context"
)
@ -10,28 +11,29 @@ import (
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
driver storagedriver.StorageDriver
pm *pathMapper
blobStore *blobStore
driver storagedriver.StorageDriver
pm *pathMapper
blobStore *blobStore
layerInfoCache cache.LayerInfoCache
}
// NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry {
bs := &blobStore{}
func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Registry {
bs := &blobStore{
driver: driver,
pm: defaultPathMapper,
}
reg := &registry{
return &registry{
driver: driver,
blobStore: bs,
// TODO(sday): This should be configurable.
pm: defaultPathMapper,
pm: defaultPathMapper,
layerInfoCache: layerInfoCache,
}
reg.blobStore.registry = reg
return reg
}
// Repository returns an instance of the repository tied to the registry.
@ -83,9 +85,29 @@ func (repo *repository) Manifests() distribution.ManifestService {
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Layers() distribution.LayerService {
return &layerStore{
ls := &layerStore{
repository: repo,
}
if repo.registry.layerInfoCache != nil {
// TODO(stevvooe): This is not the best place to setup a cache. We would
// really like to decouple the cache from the backend but also have the
// manifeset service use the layer service cache. For now, we can simply
// integrate the cache directly. The main issue is that we have layer
// access and layer data coupled in a single object. Work is already under
// way to decouple this.
return &cachedLayerService{
LayerService: ls,
repository: repo,
ctx: repo.ctx,
driver: repo.driver,
blobStore: repo.blobStore,
cache: repo.registry.layerInfoCache,
}
}
return ls
}
func (repo *repository) Signatures() distribution.SignatureService {