forked from TrueCloudLab/distribution
4c850e7165
tarsum is not actually used by the registry. Remove support for it. Convert numerous uses in unit tests to SHA256. Update docs to remove mentions of tarsums (which were often inaccurate). Remove tarsum dependency. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
268 lines
8.4 KiB
Go
268 lines
8.4 KiB
Go
package redis
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/docker/distribution/registry/storage/cache"
|
|
"github.com/garyburd/redigo/redis"
|
|
)
|
|
|
|
// redisBlobStatService provides an implementation of
|
|
// BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
|
|
// two parts. The first provide fast access to repository membership through a
|
|
// redis set for each repo. The second is a redis hash keyed by the digest of
|
|
// the layer, providing path, length and mediatype information. There is also
|
|
// a per-repository redis hash of the blob descriptor, allowing override of
|
|
// data. This is currently used to override the mediatype on a per-repository
|
|
// basis.
|
|
//
|
|
// Note that there is no implied relationship between these two caches. The
|
|
// layer may exist in one, both or none and the code must be written this way.
|
|
type redisBlobDescriptorService struct {
|
|
pool *redis.Pool
|
|
|
|
// TODO(stevvooe): We use a pool because we don't have great control over
|
|
// the cache lifecycle to manage connections. A new connection if fetched
|
|
// for each operation. Once we have better lifecycle management of the
|
|
// request objects, we can change this to a connection.
|
|
}
|
|
|
|
// NewRedisBlobDescriptorCacheProvider returns a new redis-based
|
|
// BlobDescriptorCacheProvider using the provided redis connection pool.
|
|
func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider {
|
|
return &redisBlobDescriptorService{
|
|
pool: pool,
|
|
}
|
|
}
|
|
|
|
// RepositoryScoped returns the scoped cache.
|
|
func (rbds *redisBlobDescriptorService) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) {
|
|
if _, err := reference.ParseNamed(repo); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &repositoryScopedRedisBlobDescriptorService{
|
|
repo: repo,
|
|
upstream: rbds,
|
|
}, nil
|
|
}
|
|
|
|
// Stat retrieves the descriptor data from the redis hash entry.
|
|
func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
if err := dgst.Validate(); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
conn := rbds.pool.Get()
|
|
defer conn.Close()
|
|
|
|
return rbds.stat(ctx, conn, dgst)
|
|
}
|
|
|
|
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
|
|
if err := dgst.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
conn := rbds.pool.Get()
|
|
defer conn.Close()
|
|
|
|
// Not atomic in redis <= 2.3
|
|
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reply == 0 {
|
|
return distribution.ErrBlobUnknown
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// stat provides an internal stat call that takes a connection parameter. This
|
|
// allows some internal management of the connection scope.
|
|
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
reply, err := redis.Values(conn.Do("HMGET", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype"))
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
// NOTE(stevvooe): The "size" field used to be "length". We treat a
|
|
// missing "size" field here as an unknown blob, which causes a cache
|
|
// miss, effectively migrating the field.
|
|
if len(reply) < 3 || reply[0] == nil || reply[1] == nil { // don't care if mediatype is nil
|
|
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
|
}
|
|
|
|
var desc distribution.Descriptor
|
|
if _, err := redis.Scan(reply, &desc.Digest, &desc.Size, &desc.MediaType); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
return desc, nil
|
|
}
|
|
|
|
// SetDescriptor sets the descriptor data for the given digest using a redis
|
|
// hash. A hash is used here since we may store unrelated fields about a layer
|
|
// in the future.
|
|
func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
|
if err := dgst.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := cache.ValidateDescriptor(desc); err != nil {
|
|
return err
|
|
}
|
|
|
|
conn := rbds.pool.Get()
|
|
defer conn.Close()
|
|
|
|
return rbds.setDescriptor(ctx, conn, dgst, desc)
|
|
}
|
|
|
|
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
|
if _, err := conn.Do("HMSET", rbds.blobDescriptorHashKey(dgst),
|
|
"digest", desc.Digest,
|
|
"size", desc.Size); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Only set mediatype if not already set.
|
|
if _, err := conn.Do("HSETNX", rbds.blobDescriptorHashKey(dgst),
|
|
"mediatype", desc.MediaType); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rbds *redisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
|
|
return "blobs::" + dgst.String()
|
|
}
|
|
|
|
type repositoryScopedRedisBlobDescriptorService struct {
|
|
repo string
|
|
upstream *redisBlobDescriptorService
|
|
}
|
|
|
|
var _ distribution.BlobDescriptorService = &repositoryScopedRedisBlobDescriptorService{}
|
|
|
|
// Stat ensures that the digest is a member of the specified repository and
|
|
// forwards the descriptor request to the global blob store. If the media type
|
|
// differs for the repository, we override it.
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
if err := dgst.Validate(); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
conn := rsrbds.upstream.pool.Get()
|
|
defer conn.Close()
|
|
|
|
// Check membership to repository first
|
|
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if !member {
|
|
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
|
}
|
|
|
|
upstream, err := rsrbds.upstream.stat(ctx, conn, dgst)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
// We allow a per repository mediatype, let's look it up here.
|
|
mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype"))
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if mediatype != "" {
|
|
upstream.MediaType = mediatype
|
|
}
|
|
|
|
return upstream, nil
|
|
}
|
|
|
|
// Clear removes the descriptor from the cache and forwards to the upstream descriptor store
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
|
|
if err := dgst.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
conn := rsrbds.upstream.pool.Get()
|
|
defer conn.Close()
|
|
|
|
// Check membership to repository first
|
|
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !member {
|
|
return distribution.ErrBlobUnknown
|
|
}
|
|
|
|
return rsrbds.upstream.Clear(ctx, dgst)
|
|
}
|
|
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
|
if err := dgst.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := cache.ValidateDescriptor(desc); err != nil {
|
|
return err
|
|
}
|
|
|
|
if dgst != desc.Digest {
|
|
if dgst.Algorithm() == desc.Digest.Algorithm() {
|
|
return fmt.Errorf("redis cache: digest for descriptors differ but algorthim does not: %q != %q", dgst, desc.Digest)
|
|
}
|
|
}
|
|
|
|
conn := rsrbds.upstream.pool.Get()
|
|
defer conn.Close()
|
|
|
|
return rsrbds.setDescriptor(ctx, conn, dgst, desc)
|
|
}
|
|
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
|
if _, err := conn.Do("SADD", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := rsrbds.upstream.setDescriptor(ctx, conn, dgst, desc); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Override repository mediatype.
|
|
if _, err := conn.Do("HSET", rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Also set the values for the primary descriptor, if they differ by
|
|
// algorithm (ie sha256 vs sha512).
|
|
if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() {
|
|
if err := rsrbds.setDescriptor(ctx, conn, desc.Digest, desc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
|
|
return "repository::" + rsrbds.repo + "::blobs::" + dgst.String()
|
|
}
|
|
|
|
func (rsrbds *repositoryScopedRedisBlobDescriptorService) repositoryBlobSetKey(repo string) string {
|
|
return "repository::" + rsrbds.repo + "::blobs"
|
|
}
|