distribution/registry/storage/cache/redis/redis.go
Tibor Vass ddfd2335c7 Add a new reference package abstracting repositories, tags and digests
There seems to be a need for a type that represents a way of pointing
to an image, irrespective of the implementation.

This patch defines a Reference interface and provides 3 implementations:
- TagReference: when only a tag is provided
- DigestReference: when a digest (according to the digest package) is
  provided, can include optional tag as well

Validation of references are purely syntactic.

There is also a strong type for tags, analogous to digests, as well
as a strong type for Repository from which clients can access the
hostname alone, or the repository name without the hostname, or both
together via the String() method.

For Repository, the files names.go and names_test.go were moved from
the v2 package.

Update regexp to support repeated dash and double underscore

Add field type for serialization

Since reference itself may be represented by multiple types which implement the reference inteface, serialization can lead to ambiguous type which cannot be deserialized.
Field wraps the reference object to ensure that the correct type is always deserialized, requiring an extra unwrap of the reference after deserialization.

Signed-off-by: Tibor Vass <tibor@docker.com>
Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
2015-10-09 16:33:56 -07:00

268 lines
8.4 KiB
Go

package redis
import (
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/cache"
"github.com/garyburd/redigo/redis"
)
// redisBlobStatService provides an implementation of
// BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
// two parts. The first provide fast access to repository membership through a
// redis set for each repo. The second is a redis hash keyed by the digest of
// the layer, providing path, length and mediatype information. There is also
// a per-repository redis hash of the blob descriptor, allowing override of
// data. This is currently used to override the mediatype on a per-repository
// basis.
//
// Note that there is no implied relationship between these two caches. The
// layer may exist in one, both or none and the code must be written this way.
type redisBlobDescriptorService struct {
pool *redis.Pool
// TODO(stevvooe): We use a pool because we don't have great control over
// the cache lifecycle to manage connections. A new connection if fetched
// for each operation. Once we have better lifecycle management of the
// request objects, we can change this to a connection.
}
// NewRedisBlobDescriptorCacheProvider returns a new redis-based
// BlobDescriptorCacheProvider using the provided redis connection pool.
func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider {
return &redisBlobDescriptorService{
pool: pool,
}
}
// RepositoryScoped returns the scoped cache.
func (rbds *redisBlobDescriptorService) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) {
if _, err := reference.ParseNamed(repo); err != nil {
return nil, err
}
return &repositoryScopedRedisBlobDescriptorService{
repo: repo,
upstream: rbds,
}, nil
}
// Stat retrieves the descriptor data from the redis hash entry.
func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
if err := dgst.Validate(); err != nil {
return distribution.Descriptor{}, err
}
conn := rbds.pool.Get()
defer conn.Close()
return rbds.stat(ctx, conn, dgst)
}
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rbds.pool.Get()
defer conn.Close()
// Not atomic in redis <= 2.3
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype")
if err != nil {
return err
}
if reply == 0 {
return distribution.ErrBlobUnknown
}
return nil
}
// stat provides an internal stat call that takes a connection parameter. This
// allows some internal management of the connection scope.
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
reply, err := redis.Values(conn.Do("HMGET", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype"))
if err != nil {
return distribution.Descriptor{}, err
}
// NOTE(stevvooe): The "size" field used to be "length". We treat a
// missing "size" field here as an unknown blob, which causes a cache
// miss, effectively migrating the field.
if len(reply) < 3 || reply[0] == nil || reply[1] == nil { // don't care if mediatype is nil
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
var desc distribution.Descriptor
if _, err := redis.Scan(reply, &desc.Digest, &desc.Size, &desc.MediaType); err != nil {
return distribution.Descriptor{}, err
}
return desc, nil
}
// SetDescriptor sets the descriptor data for the given digest using a redis
// hash. A hash is used here since we may store unrelated fields about a layer
// in the future.
func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil {
return err
}
if err := cache.ValidateDescriptor(desc); err != nil {
return err
}
conn := rbds.pool.Get()
defer conn.Close()
return rbds.setDescriptor(ctx, conn, dgst, desc)
}
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
if _, err := conn.Do("HMSET", rbds.blobDescriptorHashKey(dgst),
"digest", desc.Digest,
"size", desc.Size); err != nil {
return err
}
// Only set mediatype if not already set.
if _, err := conn.Do("HSETNX", rbds.blobDescriptorHashKey(dgst),
"mediatype", desc.MediaType); err != nil {
return err
}
return nil
}
func (rbds *redisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
return "blobs::" + dgst.String()
}
type repositoryScopedRedisBlobDescriptorService struct {
repo string
upstream *redisBlobDescriptorService
}
var _ distribution.BlobDescriptorService = &repositoryScopedRedisBlobDescriptorService{}
// Stat ensures that the digest is a member of the specified repository and
// forwards the descriptor request to the global blob store. If the media type
// differs for the repository, we override it.
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
if err := dgst.Validate(); err != nil {
return distribution.Descriptor{}, err
}
conn := rsrbds.upstream.pool.Get()
defer conn.Close()
// Check membership to repository first
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
if err != nil {
return distribution.Descriptor{}, err
}
if !member {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
upstream, err := rsrbds.upstream.stat(ctx, conn, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
// We allow a per repository mediatype, let's look it up here.
mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype"))
if err != nil {
return distribution.Descriptor{}, err
}
if mediatype != "" {
upstream.MediaType = mediatype
}
return upstream, nil
}
// Clear removes the descriptor from the cache and forwards to the upstream descriptor store
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rsrbds.upstream.pool.Get()
defer conn.Close()
// Check membership to repository first
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
if err != nil {
return err
}
if !member {
return distribution.ErrBlobUnknown
}
return rsrbds.upstream.Clear(ctx, dgst)
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil {
return err
}
if err := cache.ValidateDescriptor(desc); err != nil {
return err
}
if dgst != desc.Digest {
if dgst.Algorithm() == desc.Digest.Algorithm() {
return fmt.Errorf("redis cache: digest for descriptors differ but algorthim does not: %q != %q", dgst, desc.Digest)
}
}
conn := rsrbds.upstream.pool.Get()
defer conn.Close()
return rsrbds.setDescriptor(ctx, conn, dgst, desc)
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
if _, err := conn.Do("SADD", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst); err != nil {
return err
}
if err := rsrbds.upstream.setDescriptor(ctx, conn, dgst, desc); err != nil {
return err
}
// Override repository mediatype.
if _, err := conn.Do("HSET", rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType); err != nil {
return err
}
// Also set the values for the primary descriptor, if they differ by
// algorithm (ie sha256 vs tarsum).
if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() {
if err := rsrbds.setDescriptor(ctx, conn, desc.Digest, desc); err != nil {
return err
}
}
return nil
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
return "repository::" + rsrbds.repo + "::blobs::" + dgst.String()
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) repositoryBlobSetKey(repo string) string {
return "repository::" + rsrbds.repo + "::blobs"
}