Merge pull request #4081 from liubin/fix/refactor-redis

refactor redis cache
This commit is contained in:
Milos Gajdos 2023-10-03 16:01:07 +01:00 committed by GitHub
commit ed8423176f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,7 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
// redisBlobStatService provides an implementation of // redisBlobDescriptorService provides an implementation of
// BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in // BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
// two parts. The first provide fast access to repository membership through a // two parts. The first provide fast access to repository membership through a
// redis set for each repo. The second is a redis hash keyed by the digest of // redis set for each repo. The second is a redis hash keyed by the digest of
@ -33,6 +33,8 @@ type redisBlobDescriptorService struct {
// request objects, we can change this to a connection. // request objects, we can change this to a connection.
} }
var _ distribution.BlobDescriptorService = &redisBlobDescriptorService{}
// NewRedisBlobDescriptorCacheProvider returns a new redis-based // NewRedisBlobDescriptorCacheProvider returns a new redis-based
// BlobDescriptorCacheProvider using the provided redis connection pool. // BlobDescriptorCacheProvider using the provided redis connection pool.
func NewRedisBlobDescriptorCacheProvider(pool *redis.Client) cache.BlobDescriptorCacheProvider { func NewRedisBlobDescriptorCacheProvider(pool *redis.Client) cache.BlobDescriptorCacheProvider {
@ -63,7 +65,7 @@ func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Di
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
return rbds.stat(ctx, rbds.pool, dgst) return rbds.stat(ctx, dgst)
} }
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
@ -83,10 +85,8 @@ func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.D
return nil return nil
} }
// stat provides an internal stat call that takes a connection parameter. This func (rbds *redisBlobDescriptorService) stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
// allows some internal management of the connection scope. cmd := rbds.pool.HMGet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn *redis.Client, dgst digest.Digest) (distribution.Descriptor, error) {
cmd := conn.HMGet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
reply, err := cmd.Result() reply, err := cmd.Result()
if err != nil { if err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
@ -135,10 +135,10 @@ func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst
return err return err
} }
return rbds.setDescriptor(ctx, rbds.pool, dgst, desc) return rbds.setDescriptor(ctx, dgst, desc)
} }
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn *redis.Client, dgst digest.Digest, desc distribution.Descriptor) error { func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
cmd := rbds.pool.HMSet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", desc.Digest.String(), "size", desc.Size) cmd := rbds.pool.HMSet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", desc.Digest.String(), "size", desc.Size)
if cmd.Err() != nil { if cmd.Err() != nil {
return cmd.Err() return cmd.Err()
@ -180,7 +180,7 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte
return distribution.Descriptor{}, distribution.ErrBlobUnknown return distribution.Descriptor{}, distribution.ErrBlobUnknown
} }
upstream, err := rsrbds.upstream.stat(ctx, pool, dgst) upstream, err := rsrbds.upstream.stat(ctx, dgst)
if err != nil { if err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
@ -234,16 +234,17 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx cont
} }
} }
return rsrbds.setDescriptor(ctx, rsrbds.upstream.pool, dgst, desc) return rsrbds.setDescriptor(ctx, dgst, desc)
} }
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn *redis.Client, dgst digest.Digest, desc distribution.Descriptor) error { func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
conn := rsrbds.upstream.pool
_, err := conn.SAdd(ctx, rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst.String()).Result() _, err := conn.SAdd(ctx, rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst.String()).Result()
if err != nil { if err != nil {
return err return err
} }
if err := rsrbds.upstream.setDescriptor(ctx, conn, dgst, desc); err != nil { if err := rsrbds.upstream.setDescriptor(ctx, dgst, desc); err != nil {
return err return err
} }
@ -256,7 +257,7 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx cont
// Also set the values for the primary descriptor, if they differ by // Also set the values for the primary descriptor, if they differ by
// algorithm (ie sha256 vs sha512). // algorithm (ie sha256 vs sha512).
if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() { if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() {
if err := rsrbds.setDescriptor(ctx, conn, desc.Digest, desc); err != nil { if err := rsrbds.setDescriptor(ctx, desc.Digest, desc); err != nil {
return err return err
} }
} }