forked from TrueCloudLab/distribution
09d48c7192
For consistency with other systems, the redis and caching monitoring data has been moved under the "registry" section in expvar. This ensures the entire registry state is kept to a single section. Signed-off-by: Stephen J Day <stephen.day@docker.com>
202 lines
6.7 KiB
Go
202 lines
6.7 KiB
Go
package storage
|
|
|
|
import (
|
|
"expvar"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
ctxu "github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/distribution/registry/storage/cache"
|
|
"github.com/docker/distribution/registry/storage/driver"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
// cachedLayerService implements the layer service with path-aware caching,
|
|
// using a LayerInfoCache interface.
|
|
type cachedLayerService struct {
|
|
distribution.LayerService // upstream layer service
|
|
repository distribution.Repository
|
|
ctx context.Context
|
|
driver driver.StorageDriver
|
|
*blobStore // global blob store
|
|
cache cache.LayerInfoCache
|
|
}
|
|
|
|
// Exists checks for existence of the digest in the cache, immediately
|
|
// returning if it exists for the repository. If not, the upstream is checked.
|
|
// When a positive result is found, it is written into the cache.
|
|
func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) {
|
|
ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst)
|
|
now := time.Now()
|
|
defer func() {
|
|
// TODO(stevvooe): Replace this with a decent context-based metrics solution
|
|
ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)).
|
|
Infof("(*cachedLayerService).Exists(%q)", dgst)
|
|
}()
|
|
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1)
|
|
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
|
|
if err != nil {
|
|
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
|
|
goto fallback
|
|
}
|
|
|
|
if available {
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1)
|
|
return true, nil
|
|
}
|
|
|
|
fallback:
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1)
|
|
exists, err := lc.LayerService.Exists(dgst)
|
|
if err != nil {
|
|
return exists, err
|
|
}
|
|
|
|
if exists {
|
|
// we can only cache this if the existence is positive.
|
|
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
|
|
ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err)
|
|
}
|
|
}
|
|
|
|
return exists, err
|
|
}
|
|
|
|
// Fetch checks for the availability of the layer in the repository via the
|
|
// cache. If present, the metadata is resolved and the layer is returned. If
|
|
// any operation fails, the layer is read directly from the upstream. The
|
|
// results are cached, if possible.
|
|
func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) {
|
|
ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst)
|
|
now := time.Now()
|
|
defer func() {
|
|
ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)).
|
|
Infof("(*layerInfoCache).Fetch(%q)", dgst)
|
|
}()
|
|
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1)
|
|
available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
|
|
if err != nil {
|
|
ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
|
|
goto fallback
|
|
}
|
|
|
|
if available {
|
|
// fast path: get the layer info and return
|
|
meta, err := lc.cache.Meta(lc.ctx, dgst)
|
|
if err != nil {
|
|
ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err)
|
|
goto fallback
|
|
}
|
|
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1)
|
|
return newLayerReader(lc.driver, dgst, meta.Path, meta.Length)
|
|
}
|
|
|
|
// NOTE(stevvooe): Unfortunately, the cache here only makes checks for
|
|
// existing layers faster. We'd have to provide more careful
|
|
// synchronization with the backend to make the missing case as fast.
|
|
|
|
fallback:
|
|
atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1)
|
|
layer, err := lc.LayerService.Fetch(dgst)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// add the layer to the repository
|
|
if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
|
|
ctxu.GetLogger(lc.ctx).
|
|
Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err)
|
|
}
|
|
|
|
// lookup layer path and add it to the cache, if it succeds. Note that we
|
|
// still return the layer even if we have trouble caching it.
|
|
if path, err := lc.resolveLayerPath(layer); err != nil {
|
|
ctxu.GetLogger(lc.ctx).
|
|
Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err)
|
|
} else {
|
|
// add the layer to the cache once we've resolved the path.
|
|
if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil {
|
|
ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err)
|
|
}
|
|
}
|
|
|
|
return layer, err
|
|
}
|
|
|
|
// extractLayerInfo pulls the layerInfo from the layer, attempting to get the
|
|
// path information from either the concrete object or by resolving the
|
|
// primary blob store path.
|
|
func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) {
|
|
// try and resolve the type and driver, so we don't have to traverse links
|
|
switch v := layer.(type) {
|
|
case *layerReader:
|
|
// only set path if we have same driver instance.
|
|
if v.driver == lc.driver {
|
|
return v.path, nil
|
|
}
|
|
}
|
|
|
|
ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest())
|
|
// we have to do an expensive stat to resolve the layer location but no
|
|
// need to check the link, since we already have layer instance for this
|
|
// repository.
|
|
bp, err := lc.blobStore.path(layer.Digest())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return bp, nil
|
|
}
|
|
|
|
// layerInfoCacheMetrics keeps track of cache metrics for layer info cache
|
|
// requests. Note this is kept globally and made available via expvar. For
|
|
// more detailed metrics, its recommend to instrument a particular cache
|
|
// implementation.
|
|
var layerInfoCacheMetrics struct {
|
|
// Exists tracks calls to the Exists caches.
|
|
Exists struct {
|
|
Requests uint64
|
|
Hits uint64
|
|
Misses uint64
|
|
}
|
|
|
|
// Fetch tracks calls to the fetch caches.
|
|
Fetch struct {
|
|
Requests uint64
|
|
Hits uint64
|
|
Misses uint64
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
registry := expvar.Get("registry")
|
|
if registry == nil {
|
|
registry = expvar.NewMap("registry")
|
|
}
|
|
|
|
cache := registry.(*expvar.Map).Get("cache")
|
|
if cache == nil {
|
|
cache = &expvar.Map{}
|
|
cache.(*expvar.Map).Init()
|
|
registry.(*expvar.Map).Set("cache", cache)
|
|
}
|
|
|
|
storage := cache.(*expvar.Map).Get("storage")
|
|
if storage == nil {
|
|
storage = &expvar.Map{}
|
|
storage.(*expvar.Map).Init()
|
|
cache.(*expvar.Map).Set("storage", storage)
|
|
}
|
|
|
|
storage.(*expvar.Map).Set("layerinfo", expvar.Func(func() interface{} {
|
|
// no need for synchronous access: the increments are atomic and
|
|
// during reading, we don't care if the data is up to date. The
|
|
// numbers will always *eventually* be reported correctly.
|
|
return layerInfoCacheMetrics
|
|
}))
|
|
}
|