152af63ec5
This integrates the new module, which was extracted from this repository
at commit b9b19409cf458dcb9e1253ff44ba75bd0620faa6;
# install filter-repo (https://github.com/newren/git-filter-repo/blob/main/INSTALL.md)
brew install git-filter-repo
# create a temporary clone of docker
cd ~/Projects
git clone https://github.com/distribution/distribution.git reference
cd reference
# commit taken from
git rev-parse --verify HEAD
b9b19409cf
# remove all code, except for general files, 'reference/', and rename to /
git filter-repo \
--path .github/workflows/codeql-analysis.yml \
--path .github/workflows/fossa.yml \
--path .golangci.yml \
--path distribution-logo.svg \
--path CODE-OF-CONDUCT.md \
--path CONTRIBUTING.md \
--path GOVERNANCE.md \
--path README.md \
--path LICENSE \
--path MAINTAINERS \
--path-glob 'reference/*.*' \
--path-rename reference/:
# initialize go.mod
go mod init github.com/distribution/reference
go mod tidy -go=1.20
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
209 lines
4.9 KiB
Go
209 lines
4.9 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/distribution/distribution/v3/context"
|
|
"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
|
|
"github.com/distribution/reference"
|
|
)
|
|
|
|
func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
|
|
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
|
if err != nil {
|
|
t.Fatalf("could not parse reference: %v", err)
|
|
}
|
|
|
|
ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
|
|
if err != nil {
|
|
t.Fatalf("could not parse reference: %v", err)
|
|
}
|
|
|
|
ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
|
|
if err != nil {
|
|
t.Fatalf("could not parse reference: %v", err)
|
|
}
|
|
|
|
return ref1, ref2, ref3
|
|
}
|
|
|
|
func TestSchedule(t *testing.T) {
|
|
ref1, ref2, ref3 := testRefs(t)
|
|
timeUnit := time.Millisecond
|
|
remainingRepos := map[string]bool{
|
|
ref1.String(): true,
|
|
ref2.String(): true,
|
|
ref3.String(): true,
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
s := New(context.Background(), inmemory.New(), "/ttl")
|
|
deleteFunc := func(repoName reference.Reference) error {
|
|
if len(remainingRepos) == 0 {
|
|
t.Fatalf("Incorrect expiry count")
|
|
}
|
|
_, ok := remainingRepos[repoName.String()]
|
|
if !ok {
|
|
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
|
|
}
|
|
t.Log("removing", repoName)
|
|
mu.Lock()
|
|
delete(remainingRepos, repoName.String())
|
|
mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
s.onBlobExpire = deleteFunc
|
|
err := s.Start()
|
|
if err != nil {
|
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
|
}
|
|
|
|
s.add(ref1, 3*timeUnit, entryTypeBlob)
|
|
s.add(ref2, 1*timeUnit, entryTypeBlob)
|
|
|
|
func() {
|
|
s.Lock()
|
|
s.add(ref3, 1*timeUnit, entryTypeBlob)
|
|
s.Unlock()
|
|
}()
|
|
|
|
// Ensure all repos are deleted
|
|
<-time.After(50 * timeUnit)
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if len(remainingRepos) != 0 {
|
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
|
}
|
|
}
|
|
|
|
func TestRestoreOld(t *testing.T) {
|
|
ref1, ref2, _ := testRefs(t)
|
|
remainingRepos := map[string]bool{
|
|
ref1.String(): true,
|
|
ref2.String(): true,
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(remainingRepos))
|
|
var mu sync.Mutex
|
|
deleteFunc := func(r reference.Reference) error {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if r.String() == ref1.String() && len(remainingRepos) == 2 {
|
|
t.Errorf("ref1 should not be removed first")
|
|
}
|
|
_, ok := remainingRepos[r.String()]
|
|
if !ok {
|
|
t.Fatalf("Trying to remove nonexistent repo: %s", r)
|
|
}
|
|
delete(remainingRepos, r.String())
|
|
wg.Done()
|
|
return nil
|
|
}
|
|
|
|
timeUnit := time.Millisecond
|
|
serialized, err := json.Marshal(&map[string]schedulerEntry{
|
|
ref1.String(): {
|
|
Expiry: time.Now().Add(10 * timeUnit),
|
|
Key: ref1.String(),
|
|
EntryType: 0,
|
|
},
|
|
ref2.String(): {
|
|
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
|
|
Key: ref2.String(),
|
|
EntryType: 0,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error serializing test data: %s", err.Error())
|
|
}
|
|
|
|
ctx := context.Background()
|
|
pathToStatFile := "/ttl"
|
|
fs := inmemory.New()
|
|
err = fs.PutContent(ctx, pathToStatFile, serialized)
|
|
if err != nil {
|
|
t.Fatal("Unable to write serialized data to fs")
|
|
}
|
|
s := New(context.Background(), fs, "/ttl")
|
|
s.OnBlobExpire(deleteFunc)
|
|
err = s.Start()
|
|
if err != nil {
|
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
|
}
|
|
defer s.Stop()
|
|
|
|
wg.Wait()
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if len(remainingRepos) != 0 {
|
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
|
}
|
|
}
|
|
|
|
func TestStopRestore(t *testing.T) {
|
|
ref1, ref2, _ := testRefs(t)
|
|
|
|
timeUnit := time.Millisecond
|
|
remainingRepos := map[string]bool{
|
|
ref1.String(): true,
|
|
ref2.String(): true,
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
deleteFunc := func(r reference.Reference) error {
|
|
mu.Lock()
|
|
delete(remainingRepos, r.String())
|
|
mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
fs := inmemory.New()
|
|
pathToStateFile := "/ttl"
|
|
s := New(context.Background(), fs, pathToStateFile)
|
|
s.onBlobExpire = deleteFunc
|
|
|
|
err := s.Start()
|
|
if err != nil {
|
|
t.Fatalf(err.Error())
|
|
}
|
|
s.add(ref1, 300*timeUnit, entryTypeBlob)
|
|
s.add(ref2, 100*timeUnit, entryTypeBlob)
|
|
|
|
// Start and stop before all operations complete
|
|
// state will be written to fs
|
|
s.Stop()
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// v2 will restore state from fs
|
|
s2 := New(context.Background(), fs, pathToStateFile)
|
|
s2.onBlobExpire = deleteFunc
|
|
err = s2.Start()
|
|
if err != nil {
|
|
t.Fatalf("Error starting v2: %s", err.Error())
|
|
}
|
|
|
|
<-time.After(500 * timeUnit)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if len(remainingRepos) != 0 {
|
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
|
}
|
|
}
|
|
|
|
func TestDoubleStart(t *testing.T) {
|
|
s := New(context.Background(), inmemory.New(), "/ttl")
|
|
err := s.Start()
|
|
if err != nil {
|
|
t.Fatalf("Unable to start scheduler")
|
|
}
|
|
err = s.Start()
|
|
if err == nil {
|
|
t.Fatalf("Scheduler started twice without error")
|
|
}
|
|
}
|