forked from TrueCloudLab/distribution
Open cache interface
Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
parent
70074b2286
commit
fdf7c8ff15
4 changed files with 142 additions and 92 deletions
60
registry/storage/blobcachemetrics.go
Normal file
60
registry/storage/blobcachemetrics.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"expvar"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/registry/storage/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
type blobStatCollector struct {
|
||||||
|
metrics cache.Metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bsc *blobStatCollector) Hit() {
|
||||||
|
atomic.AddUint64(&bsc.metrics.Requests, 1)
|
||||||
|
atomic.AddUint64(&bsc.metrics.Hits, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bsc *blobStatCollector) Miss() {
|
||||||
|
atomic.AddUint64(&bsc.metrics.Requests, 1)
|
||||||
|
atomic.AddUint64(&bsc.metrics.Misses, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bsc *blobStatCollector) Metrics() cache.Metrics {
|
||||||
|
return bsc.metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor
|
||||||
|
// cache requests. Note this is kept globally and made available via expvar.
|
||||||
|
// For more detailed metrics, its recommend to instrument a particular cache
|
||||||
|
// implementation.
|
||||||
|
var blobStatterCacheMetrics cache.MetricsTracker = &blobStatCollector{}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registry := expvar.Get("registry")
|
||||||
|
if registry == nil {
|
||||||
|
registry = expvar.NewMap("registry")
|
||||||
|
}
|
||||||
|
|
||||||
|
cache := registry.(*expvar.Map).Get("cache")
|
||||||
|
if cache == nil {
|
||||||
|
cache = &expvar.Map{}
|
||||||
|
cache.(*expvar.Map).Init()
|
||||||
|
registry.(*expvar.Map).Set("cache", cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
storage := cache.(*expvar.Map).Get("storage")
|
||||||
|
if storage == nil {
|
||||||
|
storage = &expvar.Map{}
|
||||||
|
storage.(*expvar.Map).Init()
|
||||||
|
cache.(*expvar.Map).Set("storage", storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} {
|
||||||
|
// no need for synchronous access: the increments are atomic and
|
||||||
|
// during reading, we don't care if the data is up to date. The
|
||||||
|
// numbers will always *eventually* be reported correctly.
|
||||||
|
return blobStatterCacheMetrics
|
||||||
|
}))
|
||||||
|
}
|
80
registry/storage/cache/cachedblobdescriptorstore.go
vendored
Normal file
80
registry/storage/cache/cachedblobdescriptorstore.go
vendored
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/distribution/context"
|
||||||
|
"github.com/docker/distribution/digest"
|
||||||
|
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Metrics is used to hold metric counters
|
||||||
|
// related to the number of times a cache was
|
||||||
|
// hit or missed.
|
||||||
|
type Metrics struct {
|
||||||
|
Requests uint64
|
||||||
|
Hits uint64
|
||||||
|
Misses uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricsTracker represents a metric tracker
|
||||||
|
// which simply counts the number of hits and misses.
|
||||||
|
type MetricsTracker interface {
|
||||||
|
Hit()
|
||||||
|
Miss()
|
||||||
|
Metrics() Metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
type cachedBlobStatter struct {
|
||||||
|
cache distribution.BlobDescriptorService
|
||||||
|
backend distribution.BlobStatter
|
||||||
|
tracker MetricsTracker
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCachedBlobStatter creates a new statter which prefers a cache and
|
||||||
|
// falls back to a backend.
|
||||||
|
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobStatter) distribution.BlobStatter {
|
||||||
|
return &cachedBlobStatter{
|
||||||
|
cache: cache,
|
||||||
|
backend: backend,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
|
||||||
|
// falls back to a backend. Hits and misses will send to the tracker.
|
||||||
|
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobStatter, tracker MetricsTracker) distribution.BlobStatter {
|
||||||
|
return &cachedBlobStatter{
|
||||||
|
cache: cache,
|
||||||
|
backend: backend,
|
||||||
|
tracker: tracker,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||||
|
desc, err := cbds.cache.Stat(ctx, dgst)
|
||||||
|
if err != nil {
|
||||||
|
if err != distribution.ErrBlobUnknown {
|
||||||
|
context.GetLogger(ctx).Errorf("error retrieving descriptor from cache: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
goto fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
if cbds.tracker != nil {
|
||||||
|
cbds.tracker.Hit()
|
||||||
|
}
|
||||||
|
return desc, nil
|
||||||
|
fallback:
|
||||||
|
if cbds.tracker != nil {
|
||||||
|
cbds.tracker.Miss()
|
||||||
|
}
|
||||||
|
desc, err = cbds.backend.Stat(ctx, dgst)
|
||||||
|
if err != nil {
|
||||||
|
return desc, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
|
||||||
|
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return desc, err
|
||||||
|
}
|
|
@ -1,84 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"expvar"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/context"
|
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
|
||||||
)
|
|
||||||
|
|
||||||
type cachedBlobStatter struct {
|
|
||||||
cache distribution.BlobDescriptorService
|
|
||||||
backend distribution.BlobStatter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Requests, 1)
|
|
||||||
desc, err := cbds.cache.Stat(ctx, dgst)
|
|
||||||
if err != nil {
|
|
||||||
if err != distribution.ErrBlobUnknown {
|
|
||||||
context.GetLogger(ctx).Errorf("error retrieving descriptor from cache: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
goto fallback
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Hits, 1)
|
|
||||||
return desc, nil
|
|
||||||
fallback:
|
|
||||||
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Misses, 1)
|
|
||||||
desc, err = cbds.backend.Stat(ctx, dgst)
|
|
||||||
if err != nil {
|
|
||||||
return desc, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
|
|
||||||
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return desc, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor
|
|
||||||
// cache requests. Note this is kept globally and made available via expvar.
|
|
||||||
// For more detailed metrics, its recommend to instrument a particular cache
|
|
||||||
// implementation.
|
|
||||||
var blobStatterCacheMetrics struct {
|
|
||||||
// Stat tracks calls to the caches.
|
|
||||||
Stat struct {
|
|
||||||
Requests uint64
|
|
||||||
Hits uint64
|
|
||||||
Misses uint64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
registry := expvar.Get("registry")
|
|
||||||
if registry == nil {
|
|
||||||
registry = expvar.NewMap("registry")
|
|
||||||
}
|
|
||||||
|
|
||||||
cache := registry.(*expvar.Map).Get("cache")
|
|
||||||
if cache == nil {
|
|
||||||
cache = &expvar.Map{}
|
|
||||||
cache.(*expvar.Map).Init()
|
|
||||||
registry.(*expvar.Map).Set("cache", cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
storage := cache.(*expvar.Map).Get("storage")
|
|
||||||
if storage == nil {
|
|
||||||
storage = &expvar.Map{}
|
|
||||||
storage.(*expvar.Map).Init()
|
|
||||||
cache.(*expvar.Map).Set("storage", storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} {
|
|
||||||
// no need for synchronous access: the increments are atomic and
|
|
||||||
// during reading, we don't care if the data is up to date. The
|
|
||||||
// numbers will always *eventually* be reported correctly.
|
|
||||||
return blobStatterCacheMetrics
|
|
||||||
}))
|
|
||||||
}
|
|
|
@ -29,10 +29,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
|
||||||
}
|
}
|
||||||
|
|
||||||
if blobDescriptorCacheProvider != nil {
|
if blobDescriptorCacheProvider != nil {
|
||||||
statter = &cachedBlobStatter{
|
statter = cache.NewCachedBlobStatter(blobDescriptorCacheProvider, statter)
|
||||||
cache: blobDescriptorCacheProvider,
|
|
||||||
backend: statter,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bs := &blobStore{
|
bs := &blobStore{
|
||||||
|
@ -143,10 +140,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
if repo.descriptorCache != nil {
|
if repo.descriptorCache != nil {
|
||||||
statter = &cachedBlobStatter{
|
statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
|
||||||
cache: repo.descriptorCache,
|
|
||||||
backend: statter,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &linkedBlobStore{
|
return &linkedBlobStore{
|
||||||
|
|
Loading…
Reference in a new issue