diff --git a/registry/storage/blobcachemetrics.go b/registry/storage/blobcachemetrics.go deleted file mode 100644 index 238b5806c..000000000 --- a/registry/storage/blobcachemetrics.go +++ /dev/null @@ -1,66 +0,0 @@ -package storage - -import ( - "context" - "expvar" - "sync/atomic" - - dcontext "github.com/docker/distribution/context" - "github.com/docker/distribution/registry/storage/cache" -) - -type blobStatCollector struct { - metrics cache.Metrics -} - -func (bsc *blobStatCollector) Hit() { - atomic.AddUint64(&bsc.metrics.Requests, 1) - atomic.AddUint64(&bsc.metrics.Hits, 1) -} - -func (bsc *blobStatCollector) Miss() { - atomic.AddUint64(&bsc.metrics.Requests, 1) - atomic.AddUint64(&bsc.metrics.Misses, 1) -} - -func (bsc *blobStatCollector) Metrics() cache.Metrics { - return bsc.metrics -} - -func (bsc *blobStatCollector) Logger(ctx context.Context) cache.Logger { - return dcontext.GetLogger(ctx) -} - -// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor -// cache requests. Note this is kept globally and made available via expvar. -// For more detailed metrics, its recommend to instrument a particular cache -// implementation. -var blobStatterCacheMetrics cache.MetricsTracker = &blobStatCollector{} - -func init() { - registry := expvar.Get("registry") - if registry == nil { - registry = expvar.NewMap("registry") - } - - cache := registry.(*expvar.Map).Get("cache") - if cache == nil { - cache = &expvar.Map{} - cache.(*expvar.Map).Init() - registry.(*expvar.Map).Set("cache", cache) - } - - storage := cache.(*expvar.Map).Get("storage") - if storage == nil { - storage = &expvar.Map{} - storage.(*expvar.Map).Init() - cache.(*expvar.Map).Set("storage", storage) - } - - storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} { - // no need for synchronous access: the increments are atomic and - // during reading, we don't care if the data is up to date. The - // numbers will always *eventually* be reported correctly. - return blobStatterCacheMetrics - })) -} diff --git a/registry/storage/cache/cache_test.go b/registry/storage/cache/cache_test.go new file mode 100644 index 000000000..2e7f05566 --- /dev/null +++ b/registry/storage/cache/cache_test.go @@ -0,0 +1,131 @@ +package cache + +import ( + "context" + "errors" + "testing" + + "github.com/docker/distribution" + digest "github.com/opencontainers/go-digest" +) + +func TestCacheSet(t *testing.T) { + cache := newTestStatter() + backend := newTestStatter() + st := NewCachedBlobStatter(cache, backend) + ctx := context.Background() + + dgst := digest.Digest("dontvalidate") + _, err := st.Stat(ctx, dgst) + if err != distribution.ErrBlobUnknown { + t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) + } + + desc := distribution.Descriptor{ + Digest: dgst, + } + if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { + t.Fatal(err) + } + + actual, err := st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } + + if len(cache.sets) != 1 || len(cache.sets[dgst]) == 0 { + t.Fatalf("Expected cache set") + } + if cache.sets[dgst][0].Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", cache.sets[dgst][0], desc) + } + + desc2 := distribution.Descriptor{ + Digest: digest.Digest("dontvalidate 2"), + } + cache.sets[dgst] = append(cache.sets[dgst], desc2) + + actual, err = st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc2.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } +} + +func TestCacheError(t *testing.T) { + cache := newErrTestStatter(errors.New("cache error")) + backend := newTestStatter() + st := NewCachedBlobStatter(cache, backend) + ctx := context.Background() + + dgst := digest.Digest("dontvalidate") + _, err := st.Stat(ctx, dgst) + if err != distribution.ErrBlobUnknown { + t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) + } + + desc := distribution.Descriptor{ + Digest: dgst, + } + if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { + t.Fatal(err) + } + + actual, err := st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } + + if len(cache.sets) > 0 { + t.Fatalf("Set should not be called after stat error") + } +} + +func newTestStatter() *testStatter { + return &testStatter{ + stats: []digest.Digest{}, + sets: map[digest.Digest][]distribution.Descriptor{}, + } +} + +func newErrTestStatter(err error) *testStatter { + return &testStatter{ + sets: map[digest.Digest][]distribution.Descriptor{}, + err: err, + } +} + +type testStatter struct { + stats []digest.Digest + sets map[digest.Digest][]distribution.Descriptor + err error +} + +func (s *testStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + if s.err != nil { + return distribution.Descriptor{}, s.err + } + + if set := s.sets[dgst]; len(set) > 0 { + return set[len(set)-1], nil + } + + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} + +func (s *testStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + s.sets[dgst] = append(s.sets[dgst], desc) + return s.err +} + +func (s *testStatter) Clear(ctx context.Context, dgst digest.Digest) error { + return s.err +} diff --git a/registry/storage/cache/cachecheck/suite.go b/registry/storage/cache/cachecheck/suite.go index d241bd04c..12d6e45de 100644 --- a/registry/storage/cache/cachecheck/suite.go +++ b/registry/storage/cache/cachecheck/suite.go @@ -54,6 +54,10 @@ func checkBlobDescriptorCacheEmptyRepository(ctx context.Context, t *testing.T, t.Fatalf("expected error checking for cache item with empty digest: %v", err) } + if _, err := cache.Stat(ctx, "sha384:cba111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { + t.Fatalf("expected unknown blob error with uncached repo: %v", err) + } + if _, err := cache.Stat(ctx, "sha384:abc111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { t.Fatalf("expected unknown blob error with empty repo: %v", err) } diff --git a/registry/storage/cache/cachedblobdescriptorstore.go b/registry/storage/cache/cachedblobdescriptorstore.go index ac4c45211..f25d68d9f 100644 --- a/registry/storage/cache/cachedblobdescriptorstore.go +++ b/registry/storage/cache/cachedblobdescriptorstore.go @@ -4,39 +4,14 @@ import ( "context" "github.com/docker/distribution" + dcontext "github.com/docker/distribution/context" prometheus "github.com/docker/distribution/metrics" "github.com/opencontainers/go-digest" ) -// Metrics is used to hold metric counters -// related to the number of times a cache was -// hit or missed. -type Metrics struct { - Requests uint64 - Hits uint64 - Misses uint64 -} - -// Logger can be provided on the MetricsTracker to log errors. -// -// Usually, this is just a proxy to dcontext.GetLogger. -type Logger interface { - Errorf(format string, args ...interface{}) -} - -// MetricsTracker represents a metric tracker -// which simply counts the number of hits and misses. -type MetricsTracker interface { - Hit() - Miss() - Metrics() Metrics - Logger(context.Context) Logger -} - type cachedBlobStatter struct { cache distribution.BlobDescriptorService backend distribution.BlobDescriptorService - tracker MetricsTracker } var ( @@ -53,47 +28,36 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist } } -// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and -// falls back to a backend. Hits and misses will send to the tracker. -func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter { - return &cachedBlobStatter{ - cache: cache, - backend: backend, - tracker: tracker, - } -} - func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { cacheCount.WithValues("Request").Inc(1) - desc, err := cbds.cache.Stat(ctx, dgst) - if err != nil { - if err != distribution.ErrBlobUnknown { - logErrorf(ctx, cbds.tracker, "error retrieving descriptor from cache: %v", err) - } - goto fallback + // try getting from cache + desc, cacheErr := cbds.cache.Stat(ctx, dgst) + if cacheErr == nil { + cacheCount.WithValues("Hit").Inc(1) + return desc, nil } - cacheCount.WithValues("Hit").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Hit() - } - return desc, nil -fallback: - cacheCount.WithValues("Miss").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Miss() - } - desc, err = cbds.backend.Stat(ctx, dgst) + + // couldn't get from cache; get from backend + desc, err := cbds.backend.Stat(ctx, dgst) if err != nil { return desc, err } - if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { - logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) + if cacheErr == distribution.ErrBlobUnknown { + // cache doesn't have info. update it with info got from backend + cacheCount.WithValues("Miss").Inc(1) + if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") + } + // we don't need to return cache error upstream if any. continue returning value from backend + } else { + // unknown error from cache. just log and error. do not store cache as it may be trigger many set calls + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(cacheErr).Error("error from cache stat(ing) blob") + cacheCount.WithValues("Error").Inc(1) } - return desc, err - + return desc, nil } func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error { @@ -111,19 +75,7 @@ func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) er func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { - logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") } return nil } - -func logErrorf(ctx context.Context, tracker MetricsTracker, format string, args ...interface{}) { - if tracker == nil { - return - } - - logger := tracker.Logger(ctx) - if logger == nil { - return - } - logger.Errorf(format, args...) -} diff --git a/registry/storage/cache/metrics/prom.go b/registry/storage/cache/metrics/prom.go new file mode 100644 index 000000000..7b50e93fa --- /dev/null +++ b/registry/storage/cache/metrics/prom.go @@ -0,0 +1,69 @@ +package metrics + +import ( + "context" + "time" + + "github.com/docker/distribution" + prometheus "github.com/docker/distribution/metrics" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/go-metrics" + "github.com/opencontainers/go-digest" +) + +type prometheusCacheProvider struct { + cache.BlobDescriptorCacheProvider + latencyTimer metrics.LabeledTimer +} + +func NewPrometheusCacheProvider(wrap cache.BlobDescriptorCacheProvider, name, help string) cache.BlobDescriptorCacheProvider { + return &prometheusCacheProvider{ + wrap, + // TODO: May want to have fine grained buckets since redis calls are generally <1ms and the default minimum bucket is 5ms. + prometheus.StorageNamespace.NewLabeledTimer(name, help, "operation"), + } +} + +func (p *prometheusCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + start := time.Now() + d, e := p.BlobDescriptorCacheProvider.Stat(ctx, dgst) + p.latencyTimer.WithValues("Stat").UpdateSince(start) + return d, e +} + +func (p *prometheusCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + start := time.Now() + e := p.BlobDescriptorCacheProvider.SetDescriptor(ctx, dgst, desc) + p.latencyTimer.WithValues("SetDescriptor").UpdateSince(start) + return e +} + +type prometheusRepoCacheProvider struct { + distribution.BlobDescriptorService + latencyTimer metrics.LabeledTimer +} + +func (p *prometheusRepoCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + start := time.Now() + d, e := p.BlobDescriptorService.Stat(ctx, dgst) + p.latencyTimer.WithValues("RepoStat").UpdateSince(start) + return d, e +} + +func (p *prometheusRepoCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + start := time.Now() + e := p.BlobDescriptorService.SetDescriptor(ctx, dgst, desc) + p.latencyTimer.WithValues("RepoSetDescriptor").UpdateSince(start) + return e +} + +func (p *prometheusCacheProvider) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) { + s, err := p.BlobDescriptorCacheProvider.RepositoryScoped(repo) + if err != nil { + return nil, err + } + return &prometheusRepoCacheProvider{ + s, + p.latencyTimer, + }, nil +} diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index 550d703b6..ed03f9f0b 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -7,6 +7,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/cache/metrics" "github.com/garyburd/redigo/redis" "github.com/opencontainers/go-digest" ) @@ -34,9 +35,13 @@ type redisBlobDescriptorService struct { // NewRedisBlobDescriptorCacheProvider returns a new redis-based // BlobDescriptorCacheProvider using the provided redis connection pool. func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider { - return &redisBlobDescriptorService{ - pool: pool, - } + return metrics.NewPrometheusCacheProvider( + &redisBlobDescriptorService{ + pool: pool, + }, + "cache_redis", + "Number of seconds taken by redis", + ) } // RepositoryScoped returns the scoped cache. @@ -181,6 +186,10 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte // We allow a per repository mediatype, let's look it up here. mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype")) if err != nil { + if err == redis.ErrNil { + return distribution.Descriptor{}, distribution.ErrBlobUnknown + } + return distribution.Descriptor{}, err }