diff --git a/circle.yml b/circle.yml index 164407dc..52348a4b 100644 --- a/circle.yml +++ b/circle.yml @@ -77,13 +77,16 @@ test: timeout: 1000 pwd: $BASE_STABLE + # Test stable with race + - gvm use stable; export ROOT_PACKAGE=$(go list .); go list -tags "$DOCKER_BUILDTAGS" ./... | grep -v "/vendor/" | grep -v "registry/handlers" | grep -v "registry/storage/driver" | xargs -L 1 -I{} bash -c 'export PACKAGE={}; godep go test -race -tags "$DOCKER_BUILDTAGS" -test.short $PACKAGE': + timeout: 1000 + pwd: $BASE_STABLE post: # Report to codecov - bash <(curl -s https://codecov.io/bash): pwd: $BASE_STABLE ## Notes - # Disabled the -race detector due to massive memory usage. # Do we want these as well? # - go get code.google.com/p/go.tools/cmd/goimports # - test -z "$(goimports -l -w ./... | tee /dev/stderr)" diff --git a/registry/handlers/api_test.go b/registry/handlers/api_test.go index 3bc18c76..9d64fbbf 100644 --- a/registry/handlers/api_test.go +++ b/registry/handlers/api_test.go @@ -43,6 +43,7 @@ var headerConfig = http.Header{ // 200 OK response. func TestCheckAPI(t *testing.T) { env := newTestEnv(t, false) + defer env.Shutdown() baseURL, err := env.builder.BuildBaseURL() if err != nil { t.Fatalf("unexpected error building base url: %v", err) @@ -74,6 +75,7 @@ func TestCheckAPI(t *testing.T) { func TestCatalogAPI(t *testing.T) { chunkLen := 2 env := newTestEnv(t, false) + defer env.Shutdown() values := url.Values{ "last": []string{""}, @@ -220,12 +222,16 @@ func TestURLPrefix(t *testing.T) { config := configuration.Configuration{ Storage: configuration.Storage{ "testdriver": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, } config.HTTP.Prefix = "/test/" config.HTTP.Headers = headerConfig env := newTestEnvWithConfig(t, &config) + defer env.Shutdown() baseURL, err := env.builder.BuildBaseURL() if err != nil { @@ -273,20 +279,23 @@ func makeBlobArgs(t *testing.T) blobArgs { // TestBlobAPI conducts a full test of the of the blob api. func TestBlobAPI(t *testing.T) { deleteEnabled := false - env := newTestEnv(t, deleteEnabled) + env1 := newTestEnv(t, deleteEnabled) + defer env1.Shutdown() args := makeBlobArgs(t) - testBlobAPI(t, env, args) + testBlobAPI(t, env1, args) deleteEnabled = true - env = newTestEnv(t, deleteEnabled) + env2 := newTestEnv(t, deleteEnabled) + defer env2.Shutdown() args = makeBlobArgs(t) - testBlobAPI(t, env, args) + testBlobAPI(t, env2, args) } func TestBlobDelete(t *testing.T) { deleteEnabled := true env := newTestEnv(t, deleteEnabled) + defer env.Shutdown() args := makeBlobArgs(t) env = testBlobAPI(t, env, args) @@ -297,11 +306,15 @@ func TestRelativeURL(t *testing.T) { config := configuration.Configuration{ Storage: configuration.Storage{ "testdriver": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, } config.HTTP.Headers = headerConfig config.HTTP.RelativeURLs = false env := newTestEnvWithConfig(t, &config) + defer env.Shutdown() ref, _ := reference.WithName("foo/bar") uploadURLBaseAbs, _ := startPushLayer(t, env, ref) @@ -369,6 +382,7 @@ func TestRelativeURL(t *testing.T) { func TestBlobDeleteDisabled(t *testing.T) { deleteEnabled := false env := newTestEnv(t, deleteEnabled) + defer env.Shutdown() args := makeBlobArgs(t) imageName := args.imageName @@ -684,6 +698,7 @@ func testBlobDelete(t *testing.T, env *testEnv, args blobArgs) { func TestDeleteDisabled(t *testing.T) { env := newTestEnv(t, false) + defer env.Shutdown() imageName, _ := reference.ParseNamed("foo/bar") // "build" our layer file @@ -710,6 +725,7 @@ func TestDeleteDisabled(t *testing.T) { func TestDeleteReadOnly(t *testing.T) { env := newTestEnv(t, true) + defer env.Shutdown() imageName, _ := reference.ParseNamed("foo/bar") // "build" our layer file @@ -738,6 +754,7 @@ func TestDeleteReadOnly(t *testing.T) { func TestStartPushReadOnly(t *testing.T) { env := newTestEnv(t, true) + defer env.Shutdown() env.app.readOnly = true imageName, _ := reference.ParseNamed("foo/bar") @@ -782,16 +799,18 @@ func TestManifestAPI(t *testing.T) { schema2Repo, _ := reference.ParseNamed("foo/schema2") deleteEnabled := false - env := newTestEnv(t, deleteEnabled) - testManifestAPISchema1(t, env, schema1Repo) - schema2Args := testManifestAPISchema2(t, env, schema2Repo) - testManifestAPIManifestList(t, env, schema2Args) + env1 := newTestEnv(t, deleteEnabled) + defer env1.Shutdown() + testManifestAPISchema1(t, env1, schema1Repo) + schema2Args := testManifestAPISchema2(t, env1, schema2Repo) + testManifestAPIManifestList(t, env1, schema2Args) deleteEnabled = true - env = newTestEnv(t, deleteEnabled) - testManifestAPISchema1(t, env, schema1Repo) - schema2Args = testManifestAPISchema2(t, env, schema2Repo) - testManifestAPIManifestList(t, env, schema2Args) + env2 := newTestEnv(t, deleteEnabled) + defer env2.Shutdown() + testManifestAPISchema1(t, env2, schema1Repo) + schema2Args = testManifestAPISchema2(t, env2, schema2Repo) + testManifestAPIManifestList(t, env2, schema2Args) } func TestManifestDelete(t *testing.T) { @@ -800,6 +819,7 @@ func TestManifestDelete(t *testing.T) { deleteEnabled := true env := newTestEnv(t, deleteEnabled) + defer env.Shutdown() schema1Args := testManifestAPISchema1(t, env, schema1Repo) testManifestDelete(t, env, schema1Args) schema2Args := testManifestAPISchema2(t, env, schema2Repo) @@ -810,6 +830,7 @@ func TestManifestDeleteDisabled(t *testing.T) { schema1Repo, _ := reference.ParseNamed("foo/schema1") deleteEnabled := false env := newTestEnv(t, deleteEnabled) + defer env.Shutdown() testManifestDeleteDisabled(t, env, schema1Repo) } @@ -1886,6 +1907,9 @@ func newTestEnvMirror(t *testing.T, deleteEnabled bool) *testEnv { Storage: configuration.Storage{ "testdriver": configuration.Parameters{}, "delete": configuration.Parameters{"enabled": deleteEnabled}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, Proxy: configuration.Proxy{ RemoteURL: "http://example.com", @@ -1901,6 +1925,9 @@ func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv { Storage: configuration.Storage{ "testdriver": configuration.Parameters{}, "delete": configuration.Parameters{"enabled": deleteEnabled}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, } @@ -1935,6 +1962,11 @@ func newTestEnvWithConfig(t *testing.T, config *configuration.Configuration) *te } } +func (t *testEnv) Shutdown() { + t.server.CloseClientConnections() + t.server.Close() +} + func putManifest(t *testing.T, msg, url, contentType string, v interface{}) *http.Response { var body []byte @@ -2328,6 +2360,7 @@ func createRepository(env *testEnv, t *testing.T, imageName string, tag string) func TestRegistryAsCacheMutationAPIs(t *testing.T) { deleteEnabled := true env := newTestEnvMirror(t, deleteEnabled) + defer env.Shutdown() imageName, _ := reference.ParseNamed("foo/bar") tag := "latest" @@ -2386,6 +2419,7 @@ func TestRegistryAsCacheMutationAPIs(t *testing.T) { // that implements http.ContextNotifier. func TestCheckContextNotifier(t *testing.T) { env := newTestEnv(t, false) + defer env.Shutdown() // Register a new endpoint for testing env.app.router.Handle("/unittest/{name}/", env.app.dispatcher(func(ctx *Context, r *http.Request) http.Handler { @@ -2414,6 +2448,9 @@ func TestProxyManifestGetByTag(t *testing.T) { truthConfig := configuration.Configuration{ Storage: configuration.Storage{ "testdriver": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, } truthConfig.HTTP.Headers = headerConfig @@ -2422,6 +2459,7 @@ func TestProxyManifestGetByTag(t *testing.T) { tag := "latest" truthEnv := newTestEnvWithConfig(t, &truthConfig) + defer truthEnv.Shutdown() // create a repository in the truth registry dgst := createRepository(truthEnv, t, imageName.Name(), tag) @@ -2436,6 +2474,7 @@ func TestProxyManifestGetByTag(t *testing.T) { proxyConfig.HTTP.Headers = headerConfig proxyEnv := newTestEnvWithConfig(t, &proxyConfig) + defer proxyEnv.Shutdown() digestRef, _ := reference.WithDigest(imageName, dgst) manifestDigestURL, err := proxyEnv.builder.BuildManifestURL(digestRef) diff --git a/registry/handlers/app_test.go b/registry/handlers/app_test.go index 3a8e4e1e..385fa4c6 100644 --- a/registry/handlers/app_test.go +++ b/registry/handlers/app_test.go @@ -38,6 +38,7 @@ func TestAppDispatcher(t *testing.T) { registry: registry, } server := httptest.NewServer(app) + defer server.Close() router := v2.Router() serverURL, err := url.Parse(server.URL) @@ -143,6 +144,9 @@ func TestNewApp(t *testing.T) { config := configuration.Configuration{ Storage: configuration.Storage{ "testdriver": nil, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, Auth: configuration.Auth{ // For now, we simply test that new auth results in a viable @@ -160,6 +164,7 @@ func TestNewApp(t *testing.T) { app := NewApp(ctx, &config) server := httptest.NewServer(app) + defer server.Close() builder, err := v2.NewURLBuilderFromString(server.URL, false) if err != nil { t.Fatalf("error creating urlbuilder: %v", err) diff --git a/registry/handlers/health_test.go b/registry/handlers/health_test.go index 5fe65ede..0f38bd1c 100644 --- a/registry/handlers/health_test.go +++ b/registry/handlers/health_test.go @@ -26,6 +26,9 @@ func TestFileHealthCheck(t *testing.T) { config := &configuration.Configuration{ Storage: configuration.Storage{ "inmemory": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, Health: configuration.Health{ FileCheckers: []configuration.FileChecker{ @@ -86,6 +89,9 @@ func TestTCPHealthCheck(t *testing.T) { config := &configuration.Configuration{ Storage: configuration.Storage{ "inmemory": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, Health: configuration.Health{ TCPCheckers: []configuration.TCPChecker{ @@ -145,6 +151,9 @@ func TestHTTPHealthCheck(t *testing.T) { config := &configuration.Configuration{ Storage: configuration.Storage{ "inmemory": configuration.Parameters{}, + "maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }}, }, Health: configuration.Health{ HTTPCheckers: []configuration.HTTPChecker{ diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index 967dcd3d..8e3a0692 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -370,15 +370,20 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { wg.Wait() remoteBlobCount := len(te.inRemote) + sbsMu.Lock() if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique { + sbsMu.Unlock() t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount) } + sbsMu.Unlock() // Wait for any async storage goroutines to finish time.Sleep(3 * time.Second) + sbsMu.Lock() remoteStatCount := (*remoteStats)["stat"] remoteOpenCount := (*remoteStats)["open"] + sbsMu.Unlock() // Serveblob - blobs come from local for _, dr := range te.inRemote { @@ -403,6 +408,8 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { remoteStats = te.RemoteStats() // Ensure remote unchanged + sbsMu.Lock() + defer sbsMu.Unlock() if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount { t.Fatalf("unexpected remote stats: %#v", remoteStats) } diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go index 0c8a8534..bde94657 100644 --- a/registry/proxy/scheduler/scheduler.go +++ b/registry/proxy/scheduler/scheduler.go @@ -134,11 +134,12 @@ func (ttles *TTLExpirationScheduler) Start() error { for { select { case <-ttles.saveTimer.C: + ttles.Lock() if !ttles.indexDirty { + ttles.Unlock() continue } - ttles.Lock() err := ttles.writeState() if err != nil { context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) diff --git a/registry/proxy/scheduler/scheduler_test.go b/registry/proxy/scheduler/scheduler_test.go index 556f5204..4d69d5b5 100644 --- a/registry/proxy/scheduler/scheduler_test.go +++ b/registry/proxy/scheduler/scheduler_test.go @@ -2,6 +2,7 @@ package scheduler import ( "encoding/json" + "sync" "testing" "time" @@ -38,6 +39,7 @@ func TestSchedule(t *testing.T) { ref3.String(): true, } + var mu sync.Mutex s := New(context.Background(), inmemory.New(), "/ttl") deleteFunc := func(repoName reference.Reference) error { if len(remainingRepos) == 0 { @@ -48,7 +50,9 @@ func TestSchedule(t *testing.T) { t.Fatalf("Trying to remove nonexistent repo: %s", repoName) } t.Log("removing", repoName) + mu.Lock() delete(remainingRepos, repoName.String()) + mu.Unlock() return nil } @@ -62,12 +66,17 @@ func TestSchedule(t *testing.T) { s.add(ref2, 1*timeUnit, entryTypeBlob) func() { + s.Lock() s.add(ref3, 1*timeUnit, entryTypeBlob) + s.Unlock() }() // Ensure all repos are deleted <-time.After(50 * timeUnit) + + mu.Lock() + defer mu.Unlock() if len(remainingRepos) != 0 { t.Fatalf("Repositories remaining: %#v", remainingRepos) } @@ -80,22 +89,28 @@ func TestRestoreOld(t *testing.T) { ref2.String(): true, } + var wg sync.WaitGroup + wg.Add(len(remainingRepos)) + var mu sync.Mutex deleteFunc := func(r reference.Reference) error { + mu.Lock() + defer mu.Unlock() if r.String() == ref1.String() && len(remainingRepos) == 2 { - t.Errorf("ref1 should be removed first") + t.Errorf("ref1 should not be removed first") } _, ok := remainingRepos[r.String()] if !ok { t.Fatalf("Trying to remove nonexistent repo: %s", r) } delete(remainingRepos, r.String()) + wg.Done() return nil } timeUnit := time.Millisecond serialized, err := json.Marshal(&map[string]schedulerEntry{ ref1.String(): { - Expiry: time.Now().Add(1 * timeUnit), + Expiry: time.Now().Add(10 * timeUnit), Key: ref1.String(), EntryType: 0, }, @@ -117,13 +132,16 @@ func TestRestoreOld(t *testing.T) { t.Fatal("Unable to write serialized data to fs") } s := New(context.Background(), fs, "/ttl") - s.onBlobExpire = deleteFunc + s.OnBlobExpire(deleteFunc) err = s.Start() if err != nil { t.Fatalf("Error starting ttlExpirationScheduler: %s", err) } + defer s.Stop() - <-time.After(50 * timeUnit) + wg.Wait() + mu.Lock() + defer mu.Unlock() if len(remainingRepos) != 0 { t.Fatalf("Repositories remaining: %#v", remainingRepos) } @@ -138,8 +156,11 @@ func TestStopRestore(t *testing.T) { ref2.String(): true, } + var mu sync.Mutex deleteFunc := func(r reference.Reference) error { + mu.Lock() delete(remainingRepos, r.String()) + mu.Unlock() return nil } @@ -169,6 +190,8 @@ func TestStopRestore(t *testing.T) { } <-time.After(500 * timeUnit) + mu.Lock() + defer mu.Unlock() if len(remainingRepos) != 0 { t.Fatalf("Repositories remaining: %#v", remainingRepos) } diff --git a/registry/storage/cache/memory/memory.go b/registry/storage/cache/memory/memory.go index 68a68f08..cf125e18 100644 --- a/registry/storage/cache/memory/memory.go +++ b/registry/storage/cache/memory/memory.go @@ -77,37 +77,46 @@ type repositoryScopedInMemoryBlobDescriptorCache struct { } func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { - if rsimbdcp.repository == nil { + rsimbdcp.parent.mu.Lock() + repo := rsimbdcp.repository + rsimbdcp.parent.mu.Unlock() + + if repo == nil { return distribution.Descriptor{}, distribution.ErrBlobUnknown } - return rsimbdcp.repository.Stat(ctx, dgst) + return repo.Stat(ctx, dgst) } func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error { - if rsimbdcp.repository == nil { + rsimbdcp.parent.mu.Lock() + repo := rsimbdcp.repository + rsimbdcp.parent.mu.Unlock() + + if repo == nil { return distribution.ErrBlobUnknown } - return rsimbdcp.repository.Clear(ctx, dgst) + return repo.Clear(ctx, dgst) } func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { - if rsimbdcp.repository == nil { + rsimbdcp.parent.mu.Lock() + repo := rsimbdcp.repository + if repo == nil { // allocate map since we are setting it now. - rsimbdcp.parent.mu.Lock() var ok bool // have to read back value since we may have allocated elsewhere. - rsimbdcp.repository, ok = rsimbdcp.parent.repositories[rsimbdcp.repo] + repo, ok = rsimbdcp.parent.repositories[rsimbdcp.repo] if !ok { - rsimbdcp.repository = newMapBlobDescriptorCache() - rsimbdcp.parent.repositories[rsimbdcp.repo] = rsimbdcp.repository + repo = newMapBlobDescriptorCache() + rsimbdcp.parent.repositories[rsimbdcp.repo] = repo } - - rsimbdcp.parent.mu.Unlock() + rsimbdcp.repository = repo } + rsimbdcp.parent.mu.Unlock() - if err := rsimbdcp.repository.SetDescriptor(ctx, dgst, desc); err != nil { + if err := repo.SetDescriptor(ctx, dgst, desc); err != nil { return err }