forked from TrueCloudLab/distribution
Integrate layer info cache with registry and storage
This changeset integrates the layer info cache with the registry webapp and storage backend. The main benefit is to cache immutable layer meta data, reducing backend roundtrips. The cache can be configured to use either redis or an inmemory cache. This provides massive performance benefits for HEAD http checks on layer blobs and manifest verification. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
a7c2dceea5
commit
6ab228f798
10 changed files with 256 additions and 24 deletions
|
@ -1,3 +1,3 @@
|
|||
// Package registry is a placeholder package for registry interface
|
||||
// destinations and utilities.
|
||||
// definitions and utilities.
|
||||
package registry
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
|
||||
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
||||
|
@ -102,7 +103,13 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
|||
app.configureEvents(&configuration)
|
||||
app.configureRedis(&configuration)
|
||||
|
||||
app.registry = storage.NewRegistryWithDriver(app.driver)
|
||||
if app.redis != nil {
|
||||
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
|
||||
} else {
|
||||
// always fall back to inmemory storage
|
||||
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
|
||||
}
|
||||
|
||||
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/docker/distribution/registry/auth"
|
||||
_ "github.com/docker/distribution/registry/auth/silly"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -28,7 +29,7 @@ func TestAppDispatcher(t *testing.T) {
|
|||
Context: context.Background(),
|
||||
router: v2.Router(),
|
||||
driver: driver,
|
||||
registry: storage.NewRegistryWithDriver(driver),
|
||||
registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()),
|
||||
}
|
||||
server := httptest.NewServer(app)
|
||||
router := v2.Router()
|
||||
|
|
|
@ -18,8 +18,9 @@ import (
|
|||
// abstraction, providing utility methods that support creating and traversing
|
||||
// backend links.
|
||||
type blobStore struct {
|
||||
*registry
|
||||
ctx context.Context
|
||||
driver storagedriver.StorageDriver
|
||||
pm *pathMapper
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// exists reports whether or not the path exists. If the driver returns error
|
||||
|
|
|
@ -27,8 +27,8 @@ type fileReader struct {
|
|||
|
||||
// identifying fields
|
||||
path string
|
||||
size int64 // size is the total layer size, must be set.
|
||||
modtime time.Time
|
||||
size int64 // size is the total size, must be set.
|
||||
modtime time.Time // TODO(stevvooe): This is not needed anymore.
|
||||
|
||||
// mutable fields
|
||||
rc io.ReadCloser // remote read closer
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
"github.com/docker/distribution/testutil"
|
||||
|
@ -35,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(driver)
|
||||
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
@ -143,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(driver)
|
||||
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
@ -180,7 +181,7 @@ func TestSimpleLayerRead(t *testing.T) {
|
|||
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
|
||||
}
|
||||
|
||||
randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
|
||||
randomLayerDigest, err := writeTestLayer(driver, defaultPathMapper, imageName, dgst, randomLayerReader)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error writing test layer: %v", err)
|
||||
}
|
||||
|
@ -252,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(driver)
|
||||
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
|
183
docs/storage/layercache.go
Normal file
183
docs/storage/layercache.go
Normal file
|
@ -0,0 +1,183 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
ctxu "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// cachedLayerService implements the layer service with path-aware caching,
|
||||
// using a LayerInfoCache interface.
|
||||
type cachedLayerService struct {
|
||||
distribution.LayerService // upstream layer service
|
||||
repository distribution.Repository
|
||||
ctx context.Context
|
||||
driver driver.StorageDriver
|
||||
*blobStore // global blob store
|
||||
cache cache.LayerInfoCache
|
||||
}
|
||||
|
||||
// Exists checks for existence of the digest in the cache, immediately
|
||||
// returning if it exists for the repository. If not, the upstream is checked.
|
||||
// When a positive result is found, it is written into the cache.
|
||||
func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) {
|
||||
ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst)
|
||||
now := time.Now()
|
||||
defer func() {
|
||||
// TODO(stevvooe): Replace this with a decent context-based metrics solution
|
||||
ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)).
|
||||
Infof("(*cachedLayerService).Exists(%q)", dgst)
|
||||
}()
|
||||
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1)
|
||||
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
|
||||
if err != nil {
|
||||
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
|
||||
goto fallback
|
||||
}
|
||||
|
||||
if available {
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
fallback:
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1)
|
||||
exists, err := lc.LayerService.Exists(dgst)
|
||||
if err != nil {
|
||||
return exists, err
|
||||
}
|
||||
|
||||
if exists {
|
||||
// we can only cache this if the existence is positive.
|
||||
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
|
||||
ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err)
|
||||
}
|
||||
}
|
||||
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// Fetch checks for the availability of the layer in the repository via the
|
||||
// cache. If present, the metadata is resolved and the layer is returned. If
|
||||
// any operation fails, the layer is read directly from the upstream. The
|
||||
// results are cached, if possible.
|
||||
func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) {
|
||||
ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst)
|
||||
now := time.Now()
|
||||
defer func() {
|
||||
ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)).
|
||||
Infof("(*layerInfoCache).Fetch(%q)", dgst)
|
||||
}()
|
||||
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1)
|
||||
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
|
||||
if err != nil {
|
||||
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
|
||||
goto fallback
|
||||
}
|
||||
|
||||
if available {
|
||||
// fast path: get the layer info and return
|
||||
meta, err := lc.cache.Meta(lc.ctx, dgst)
|
||||
if err != nil {
|
||||
ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err)
|
||||
goto fallback
|
||||
}
|
||||
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1)
|
||||
return newLayerReader(lc.driver, dgst, meta.Path, meta.Length)
|
||||
}
|
||||
|
||||
// NOTE(stevvooe): Unfortunately, the cache here only makes checks for
|
||||
// existing layers faster. We'd have to provide more careful
|
||||
// synchronization with the backend to make the missing case as fast.
|
||||
|
||||
fallback:
|
||||
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1)
|
||||
layer, err := lc.LayerService.Fetch(dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add the layer to the repository
|
||||
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
|
||||
ctxu.GetLogger(lc.ctx).
|
||||
Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err)
|
||||
}
|
||||
|
||||
// lookup layer path and add it to the cache, if it succeds. Note that we
|
||||
// still return the layer even if we have trouble caching it.
|
||||
if path, err := lc.resolveLayerPath(layer); err != nil {
|
||||
ctxu.GetLogger(lc.ctx).
|
||||
Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err)
|
||||
} else {
|
||||
// add the layer to the cache once we've resolved the path.
|
||||
if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil {
|
||||
ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err)
|
||||
}
|
||||
}
|
||||
|
||||
return layer, err
|
||||
}
|
||||
|
||||
// extractLayerInfo pulls the layerInfo from the layer, attempting to get the
|
||||
// path information from either the concrete object or by resolving the
|
||||
// primary blob store path.
|
||||
func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) {
|
||||
// try and resolve the type and driver, so we don't have to traverse links
|
||||
switch v := layer.(type) {
|
||||
case *layerReader:
|
||||
// only set path if we have same driver instance.
|
||||
if v.driver == lc.driver {
|
||||
return v.path, nil
|
||||
}
|
||||
}
|
||||
|
||||
ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest())
|
||||
// we have to do an expensive stat to resolve the layer location but no
|
||||
// need to check the link, since we already have layer instance for this
|
||||
// repository.
|
||||
bp, err := lc.blobStore.path(layer.Digest())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return bp, nil
|
||||
}
|
||||
|
||||
// layerInfoCacheMetrics keeps track of cache metrics for layer info cache
|
||||
// requests. Note this is kept globally and made available via expvar. For
|
||||
// more detailed metrics, its recommend to instrument a particular cache
|
||||
// implementation.
|
||||
var layerInfoCacheMetrics struct {
|
||||
// Exists tracks calls to the Exists caches.
|
||||
Exists struct {
|
||||
Requests uint64
|
||||
Hits uint64
|
||||
Misses uint64
|
||||
}
|
||||
|
||||
// Fetch tracks calls to the fetch caches.
|
||||
Fetch struct {
|
||||
Requests uint64
|
||||
Hits uint64
|
||||
Misses uint64
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
expvar.Publish("layerinfocache", expvar.Func(func() interface{} {
|
||||
// no need for synchronous access: the increments are atomic and
|
||||
// during reading, we don't care if the data is up to date. The
|
||||
// numbers will always *eventually* be reported correctly.
|
||||
return layerInfoCacheMetrics
|
||||
}))
|
||||
}
|
|
@ -17,6 +17,21 @@ type layerReader struct {
|
|||
digest digest.Digest
|
||||
}
|
||||
|
||||
// newLayerReader returns a new layerReader with the digest, path and length,
|
||||
// eliding round trips to the storage backend.
|
||||
func newLayerReader(driver driver.StorageDriver, dgst digest.Digest, path string, length int64) (*layerReader, error) {
|
||||
fr := &fileReader{
|
||||
driver: driver,
|
||||
path: path,
|
||||
size: length,
|
||||
}
|
||||
|
||||
return &layerReader{
|
||||
fileReader: *fr,
|
||||
digest: dgst,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ distribution.Layer = &layerReader{}
|
||||
|
||||
func (lr *layerReader) Digest() digest.Digest {
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/distribution/manifest"
|
||||
|
@ -28,7 +30,7 @@ type manifestStoreTestEnv struct {
|
|||
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
|
||||
ctx := context.Background()
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(driver)
|
||||
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
|
||||
|
||||
repo, err := registry.Repository(ctx, name)
|
||||
if err != nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package storage
|
|||
import (
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/registry/api/v2"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -10,28 +11,29 @@ import (
|
|||
// registry is the top-level implementation of Registry for use in the storage
|
||||
// package. All instances should descend from this object.
|
||||
type registry struct {
|
||||
driver storagedriver.StorageDriver
|
||||
pm *pathMapper
|
||||
blobStore *blobStore
|
||||
driver storagedriver.StorageDriver
|
||||
pm *pathMapper
|
||||
blobStore *blobStore
|
||||
layerInfoCache cache.LayerInfoCache
|
||||
}
|
||||
|
||||
// NewRegistryWithDriver creates a new registry instance from the provided
|
||||
// driver. The resulting registry may be shared by multiple goroutines but is
|
||||
// cheap to allocate.
|
||||
func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry {
|
||||
bs := &blobStore{}
|
||||
func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Registry {
|
||||
bs := &blobStore{
|
||||
driver: driver,
|
||||
pm: defaultPathMapper,
|
||||
}
|
||||
|
||||
reg := ®istry{
|
||||
return ®istry{
|
||||
driver: driver,
|
||||
blobStore: bs,
|
||||
|
||||
// TODO(sday): This should be configurable.
|
||||
pm: defaultPathMapper,
|
||||
pm: defaultPathMapper,
|
||||
layerInfoCache: layerInfoCache,
|
||||
}
|
||||
|
||||
reg.blobStore.registry = reg
|
||||
|
||||
return reg
|
||||
}
|
||||
|
||||
// Repository returns an instance of the repository tied to the registry.
|
||||
|
@ -83,9 +85,29 @@ func (repo *repository) Manifests() distribution.ManifestService {
|
|||
// may be context sensitive in the future. The instance should be used similar
|
||||
// to a request local.
|
||||
func (repo *repository) Layers() distribution.LayerService {
|
||||
return &layerStore{
|
||||
ls := &layerStore{
|
||||
repository: repo,
|
||||
}
|
||||
|
||||
if repo.registry.layerInfoCache != nil {
|
||||
// TODO(stevvooe): This is not the best place to setup a cache. We would
|
||||
// really like to decouple the cache from the backend but also have the
|
||||
// manifeset service use the layer service cache. For now, we can simply
|
||||
// integrate the cache directly. The main issue is that we have layer
|
||||
// access and layer data coupled in a single object. Work is already under
|
||||
// way to decouple this.
|
||||
|
||||
return &cachedLayerService{
|
||||
LayerService: ls,
|
||||
repository: repo,
|
||||
ctx: repo.ctx,
|
||||
driver: repo.driver,
|
||||
blobStore: repo.blobStore,
|
||||
cache: repo.registry.layerInfoCache,
|
||||
}
|
||||
}
|
||||
|
||||
return ls
|
||||
}
|
||||
|
||||
func (repo *repository) Signatures() distribution.SignatureService {
|
||||
|
|
Loading…
Reference in a new issue