Fix a race condition in pull through cache population by removing the functionality
of readers joining current downloads. Concurrent requests for the same blob will not block, but only the first instance will be comitted locally. Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
parent
4946d0d035
commit
515b9596e7
3 changed files with 260 additions and 175 deletions
|
@ -1,10 +1,13 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/context"
|
||||
|
@ -12,75 +15,119 @@ import (
|
|||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||
"github.com/docker/distribution/registry/storage/driver/filesystem"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
)
|
||||
|
||||
var sbsMu sync.Mutex
|
||||
|
||||
type statsBlobStore struct {
|
||||
stats map[string]int
|
||||
blobs distribution.BlobStore
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["put"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Put(ctx, mediaType, p)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["get"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Get(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["create"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Create(ctx)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["resume"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Resume(ctx, id)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["open"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Open(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["serveblob"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.ServeBlob(ctx, w, r, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
|
||||
sbsMu.Lock()
|
||||
sbs.stats["stat"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["delete"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Delete(ctx, dgst)
|
||||
}
|
||||
|
||||
type testEnv struct {
|
||||
inRemote []distribution.Descriptor
|
||||
store proxyBlobStore
|
||||
ctx context.Context
|
||||
numUnique int
|
||||
inRemote []distribution.Descriptor
|
||||
store proxyBlobStore
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (te testEnv) LocalStats() *map[string]int {
|
||||
func (te *testEnv) LocalStats() *map[string]int {
|
||||
sbsMu.Lock()
|
||||
ls := te.store.localStore.(statsBlobStore).stats
|
||||
sbsMu.Unlock()
|
||||
return &ls
|
||||
}
|
||||
|
||||
func (te testEnv) RemoteStats() *map[string]int {
|
||||
func (te *testEnv) RemoteStats() *map[string]int {
|
||||
sbsMu.Lock()
|
||||
rs := te.store.remoteStore.(statsBlobStore).stats
|
||||
sbsMu.Unlock()
|
||||
return &rs
|
||||
}
|
||||
|
||||
// Populate remote store and record the digests
|
||||
func makeTestEnv(t *testing.T, name string) testEnv {
|
||||
func makeTestEnv(t *testing.T, name string) *testEnv {
|
||||
ctx := context.Background()
|
||||
|
||||
localRegistry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
|
||||
truthDir, err := ioutil.TempDir("", "truth")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create tempdir: %s", err)
|
||||
}
|
||||
|
||||
cacheDir, err := ioutil.TempDir("", "cache")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create tempdir: %s", err)
|
||||
}
|
||||
|
||||
// todo: create a tempfile area here
|
||||
localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
|
@ -89,7 +136,7 @@ func makeTestEnv(t *testing.T, name string) testEnv {
|
|||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
|
||||
truthRegistry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
|
||||
truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
|
@ -116,33 +163,59 @@ func makeTestEnv(t *testing.T, name string) testEnv {
|
|||
scheduler: s,
|
||||
}
|
||||
|
||||
te := testEnv{
|
||||
te := &testEnv{
|
||||
store: proxyBlobStore,
|
||||
ctx: ctx,
|
||||
}
|
||||
return te
|
||||
}
|
||||
|
||||
func populate(t *testing.T, te *testEnv, blobCount int) {
|
||||
var inRemote []distribution.Descriptor
|
||||
for i := 0; i < blobCount; i++ {
|
||||
bytes := []byte(fmt.Sprintf("blob%d", i))
|
||||
func makeBlob(size int) []byte {
|
||||
blob := make([]byte, size, size)
|
||||
for i := 0; i < size; i++ {
|
||||
blob[i] = byte('A' + rand.Int()%48)
|
||||
}
|
||||
return blob
|
||||
}
|
||||
|
||||
desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
|
||||
if err != nil {
|
||||
t.Errorf("Put in store")
|
||||
func init() {
|
||||
rand.Seed(42)
|
||||
}
|
||||
|
||||
func perm(m []distribution.Descriptor) []distribution.Descriptor {
|
||||
for i := 0; i < len(m); i++ {
|
||||
j := rand.Intn(i + 1)
|
||||
tmp := m[i]
|
||||
m[i] = m[j]
|
||||
m[j] = tmp
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func populate(t *testing.T, te *testEnv, blobCount, size, numUnique int) {
|
||||
var inRemote []distribution.Descriptor
|
||||
|
||||
for i := 0; i < numUnique; i++ {
|
||||
bytes := makeBlob(size)
|
||||
for j := 0; j < blobCount/numUnique; j++ {
|
||||
desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
|
||||
if err != nil {
|
||||
t.Fatalf("Put in store")
|
||||
}
|
||||
|
||||
inRemote = append(inRemote, desc)
|
||||
}
|
||||
inRemote = append(inRemote, desc)
|
||||
}
|
||||
|
||||
te.inRemote = inRemote
|
||||
|
||||
te.numUnique = numUnique
|
||||
}
|
||||
|
||||
func TestProxyStoreStat(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
|
||||
remoteBlobCount := 1
|
||||
populate(t, &te, remoteBlobCount)
|
||||
populate(t, te, remoteBlobCount, 10, 1)
|
||||
|
||||
localStats := te.LocalStats()
|
||||
remoteStats := te.RemoteStats()
|
||||
|
@ -164,43 +237,91 @@ func TestProxyStoreStat(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestProxyStoreServe(t *testing.T) {
|
||||
func TestProxyStoreServeHighConcurrency(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
remoteBlobCount := 1
|
||||
populate(t, &te, remoteBlobCount)
|
||||
blobSize := 200
|
||||
blobCount := 10
|
||||
numUnique := 1
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 16
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
func TestProxyStoreServeMany(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
blobSize := 200
|
||||
blobCount := 10
|
||||
numUnique := 4
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 4
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
// todo(richardscothern): blobCount must be smaller than num clients
|
||||
func TestProxyStoreServeBig(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
|
||||
blobSize := 2 << 20
|
||||
blobCount := 4
|
||||
numUnique := 2
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 4
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
// testProxyStoreServe will create clients to consume all blobs
|
||||
// populated in the truth store
|
||||
func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||
localStats := te.LocalStats()
|
||||
remoteStats := te.RemoteStats()
|
||||
|
||||
// Serveblob - pulls through blobs
|
||||
for _, dr := range te.inRemote {
|
||||
w := httptest.NewRecorder()
|
||||
r, err := http.NewRequest("GET", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
|
||||
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
for i := 0; i < numClients; i++ {
|
||||
// Serveblob - pulls through blobs
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for _, remoteBlob := range te.inRemote {
|
||||
w := httptest.NewRecorder()
|
||||
r, err := http.NewRequest("GET", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dl, err := digest.FromBytes(w.Body.Bytes())
|
||||
if err != nil {
|
||||
t.Fatalf("Error making digest from blob")
|
||||
}
|
||||
if dl != dr.Digest {
|
||||
t.Errorf("Mismatching blob fetch from proxy")
|
||||
}
|
||||
err = te.store.ServeBlob(te.ctx, w, r, remoteBlob.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
bodyBytes := w.Body.Bytes()
|
||||
localDigest, err := digest.FromBytes(bodyBytes)
|
||||
if err != nil {
|
||||
t.Fatalf("Error making digest from blob")
|
||||
}
|
||||
if localDigest != remoteBlob.Digest {
|
||||
t.Fatalf("Mismatching blob fetch from proxy")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount {
|
||||
t.Fatalf("unexpected local stats")
|
||||
}
|
||||
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
|
||||
t.Fatalf("unexpected local stats")
|
||||
wg.Wait()
|
||||
|
||||
remoteBlobCount := len(te.inRemote)
|
||||
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
||||
}
|
||||
|
||||
// Wait for any async storage goroutines to finish
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
remoteStatCount := (*remoteStats)["stat"]
|
||||
remoteOpenCount := (*remoteStats)["open"]
|
||||
|
||||
// Serveblob - blobs come from local
|
||||
for _, dr := range te.inRemote {
|
||||
w := httptest.NewRecorder()
|
||||
|
@ -223,15 +344,11 @@ func TestProxyStoreServe(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Stat to find local, but no new blobs were created
|
||||
if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 {
|
||||
t.Fatalf("unexpected local stats")
|
||||
}
|
||||
localStats = te.LocalStats()
|
||||
remoteStats = te.RemoteStats()
|
||||
|
||||
// Remote unchanged
|
||||
if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount {
|
||||
fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats)
|
||||
t.Fatalf("unexpected local stats")
|
||||
// Ensure remote unchanged
|
||||
if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
|
||||
t.Fatalf("unexpected remote stats: %#v", remoteStats)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue