diff --git a/pkg/artifactcache/handler.go b/pkg/artifactcache/handler.go index 1f0a6f6..395a39e 100644 --- a/pkg/artifactcache/handler.go +++ b/pkg/artifactcache/handler.go @@ -35,7 +35,7 @@ type Handler struct { server *http.Server logger logrus.FieldLogger - gcing int32 // TODO: use atomic.Bool when we can use Go 1.19 + gcing atomic.Bool gcAt time.Time outboundIP string @@ -170,7 +170,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para } defer db.Close() - cache, err := h.findCache(db, keys, version) + cache, err := findCache(db, keys, version) if err != nil { h.responseJSON(w, r, 500, err) return @@ -206,32 +206,17 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P api.Key = strings.ToLower(api.Key) cache := api.ToCache() - cache.FillKeyVersionHash() db, err := h.openDB() if err != nil { h.responseJSON(w, r, 500, err) return } defer db.Close() - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - h.responseJSON(w, r, 500, err) - return - } - } else { - h.responseJSON(w, r, 400, fmt.Errorf("already exist")) - return - } now := time.Now().Unix() cache.CreatedAt = now cache.UsedAt = now - if err := db.Insert(bolthold.NextSequence(), cache); err != nil { - h.responseJSON(w, r, 500, err) - return - } - // write back id to db - if err := db.Update(cache.ID, cache); err != nil { + if err := insertCache(db, cache); err != nil { h.responseJSON(w, r, 500, err) return } @@ -364,56 +349,40 @@ func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle { } // if not found, return (nil, nil) instead of an error. -func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { - if len(keys) == 0 { - return nil, nil - } - key := keys[0] // the first key is for exact match. - - cache := &Cache{ - Key: key, - Version: version, - } - cache.FillKeyVersionHash() - - if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil { - if !errors.Is(err, bolthold.ErrNotFound) { - return nil, err - } - } else if cache.Complete { - return cache, nil - } - stop := fmt.Errorf("stop") - +func findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) { + cache := &Cache{} for _, prefix := range keys { - found := false prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix)) re, err := regexp.Compile(prefixPattern) if err != nil { continue } - if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error { - if !strings.HasPrefix(v.Key, prefix) { - return stop - } - if v.Complete { - cache = v - found = true - return stop - } - return nil - }); err != nil { - if !errors.Is(err, stop) { - return nil, err + if err := db.FindOne(cache, + bolthold.Where("Key").RegExp(re). + And("Version").Eq(version). + And("Complete").Eq(true). + SortBy("CreatedAt").Reverse()); err != nil { + if errors.Is(err, bolthold.ErrNotFound) { + continue } + return nil, fmt.Errorf("find cache: %w", err) } - if found { - return cache, nil - } + return cache, nil } return nil, nil } +func insertCache(db *bolthold.Store, cache *Cache) error { + if err := db.Insert(bolthold.NextSequence(), cache); err != nil { + return fmt.Errorf("insert cache: %w", err) + } + // write back id to db + if err := db.Update(cache.ID, cache); err != nil { + return fmt.Errorf("write back id to db: %w", err) + } + return nil +} + func (h *Handler) useCache(id int64) { db, err := h.openDB() if err != nil { @@ -428,14 +397,21 @@ func (h *Handler) useCache(id int64) { _ = db.Update(cache.ID, cache) } +const ( + keepUsed = 30 * 24 * time.Hour + keepUnused = 7 * 24 * time.Hour + keepTemp = 5 * time.Minute + keepOld = 5 * time.Minute +) + func (h *Handler) gcCache() { - if atomic.LoadInt32(&h.gcing) != 0 { + if h.gcing.Load() { return } - if !atomic.CompareAndSwapInt32(&h.gcing, 0, 1) { + if !h.gcing.CompareAndSwap(false, true) { return } - defer atomic.StoreInt32(&h.gcing, 0) + defer h.gcing.Store(false) if time.Since(h.gcAt) < time.Hour { h.logger.Debugf("skip gc: %v", h.gcAt.String()) @@ -444,37 +420,18 @@ func (h *Handler) gcCache() { h.gcAt = time.Now() h.logger.Debugf("gc: %v", h.gcAt.String()) - const ( - keepUsed = 30 * 24 * time.Hour - keepUnused = 7 * 24 * time.Hour - keepTemp = 5 * time.Minute - ) - db, err := h.openDB() if err != nil { return } defer db.Close() + // Remove the caches which are not completed for a while, they are most likely to be broken. var caches []*Cache - if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil { - h.logger.Warnf("find caches: %v", err) - } else { - for _, cache := range caches { - if cache.Complete { - continue - } - h.storage.Remove(cache.ID) - if err := db.Delete(cache.ID, cache); err != nil { - h.logger.Warnf("delete cache: %v", err) - continue - } - h.logger.Infof("deleted cache: %+v", cache) - } - } - - caches = caches[:0] - if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil { + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()). + And("Complete").Eq(false), + ); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { @@ -487,8 +444,11 @@ func (h *Handler) gcCache() { } } + // Remove the old caches which have not been used recently. caches = caches[:0] - if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil { + if err := db.Find(&caches, bolthold. + Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()), + ); err != nil { h.logger.Warnf("find caches: %v", err) } else { for _, cache := range caches { @@ -500,6 +460,55 @@ func (h *Handler) gcCache() { h.logger.Infof("deleted cache: %+v", cache) } } + + // Remove the old caches which are too old. + caches = caches[:0] + if err := db.Find(&caches, bolthold. + Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()), + ); err != nil { + h.logger.Warnf("find caches: %v", err) + } else { + for _, cache := range caches { + h.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + h.logger.Warnf("delete cache: %v", err) + continue + } + h.logger.Infof("deleted cache: %+v", cache) + } + } + + // Remove the old caches with the same key and version, keep the latest one. + // Also keep the olds which have been used recently for a while in case of the cache is still in use. + if results, err := db.FindAggregate( + &Cache{}, + bolthold.Where("Complete").Eq(true), + "Key", "Version", + ); err != nil { + h.logger.Warnf("find aggregate caches: %v", err) + } else { + for _, result := range results { + if result.Count() <= 1 { + continue + } + result.Sort("CreatedAt") + caches = caches[:0] + result.Reduction(&caches) + for _, cache := range caches[:len(caches)-1] { + if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld { + // Keep it since it has been used recently, even if it's old. + // Or it could break downloading in process. + continue + } + h.storage.Remove(cache.ID) + if err := db.Delete(cache.ID, cache); err != nil { + h.logger.Warnf("delete cache: %v", err) + continue + } + h.logger.Infof("deleted cache: %+v", cache) + } + } + } } func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) { diff --git a/pkg/artifactcache/handler_test.go b/pkg/artifactcache/handler_test.go index 35ec753..b418f00 100644 --- a/pkg/artifactcache/handler_test.go +++ b/pkg/artifactcache/handler_test.go @@ -10,9 +10,11 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/timshannon/bolthold" "go.etcd.io/bbolt" ) @@ -78,6 +80,9 @@ func TestHandler(t *testing.T) { t.Run("duplicate reserve", func(t *testing.T) { key := strings.ToLower(t.Name()) version := "c19da02a2bd7e77277f1ac29ab45c09b7d46a4ee758284e26bb3045ad11d9d20" + var first, second struct { + CacheID uint64 `json:"cacheId"` + } { body, err := json.Marshal(&Request{ Key: key, @@ -89,10 +94,8 @@ func TestHandler(t *testing.T) { require.NoError(t, err) assert.Equal(t, 200, resp.StatusCode) - got := struct { - CacheID uint64 `json:"cacheId"` - }{} - require.NoError(t, json.NewDecoder(resp.Body).Decode(&got)) + require.NoError(t, json.NewDecoder(resp.Body).Decode(&first)) + assert.NotZero(t, first.CacheID) } { body, err := json.Marshal(&Request{ @@ -103,8 +106,13 @@ func TestHandler(t *testing.T) { require.NoError(t, err) resp, err := http.Post(fmt.Sprintf("%s/caches", base), "application/json", bytes.NewReader(body)) require.NoError(t, err) - assert.Equal(t, 400, resp.StatusCode) + assert.Equal(t, 200, resp.StatusCode) + + require.NoError(t, json.NewDecoder(resp.Body).Decode(&second)) + assert.NotZero(t, second.CacheID) } + + assert.NotEqual(t, first.CacheID, second.CacheID) }) t.Run("upload with bad id", func(t *testing.T) { @@ -341,9 +349,9 @@ func TestHandler(t *testing.T) { version := "c19da02a2bd7e77277f1ac29ab45c09b7d46a4ee758284e26bb3045ad11d9d20" key := strings.ToLower(t.Name()) keys := [3]string{ - key + "_a", - key + "_a_b", key + "_a_b_c", + key + "_a_b", + key + "_a", } contents := [3][]byte{ make([]byte, 100), @@ -354,6 +362,7 @@ func TestHandler(t *testing.T) { _, err := rand.Read(contents[i]) require.NoError(t, err) uploadCacheNormally(t, base, keys[i], version, contents[i]) + time.Sleep(time.Second) // ensure CreatedAt of caches are different } reqKeys := strings.Join([]string{ @@ -361,29 +370,33 @@ func TestHandler(t *testing.T) { key + "_a_b", key + "_a", }, ",") - var archiveLocation string - { - resp, err := http.Get(fmt.Sprintf("%s/cache?keys=%s&version=%s", base, reqKeys, version)) - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - got := struct { - Result string `json:"result"` - ArchiveLocation string `json:"archiveLocation"` - CacheKey string `json:"cacheKey"` - }{} - require.NoError(t, json.NewDecoder(resp.Body).Decode(&got)) - assert.Equal(t, "hit", got.Result) - assert.Equal(t, keys[1], got.CacheKey) - archiveLocation = got.ArchiveLocation - } - { - resp, err := http.Get(archiveLocation) //nolint:gosec - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - got, err := io.ReadAll(resp.Body) - require.NoError(t, err) - assert.Equal(t, contents[1], got) - } + + resp, err := http.Get(fmt.Sprintf("%s/cache?keys=%s&version=%s", base, reqKeys, version)) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + /* + Expect `key_a_b` because: + - `key_a_b_x" doesn't match any caches. + - `key_a_b" matches `key_a_b` and `key_a_b_c`, but `key_a_b` is newer. + */ + except := 1 + + got := struct { + Result string `json:"result"` + ArchiveLocation string `json:"archiveLocation"` + CacheKey string `json:"cacheKey"` + }{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&got)) + assert.Equal(t, "hit", got.Result) + assert.Equal(t, keys[except], got.CacheKey) + + contentResp, err := http.Get(got.ArchiveLocation) + require.NoError(t, err) + require.Equal(t, 200, contentResp.StatusCode) + content, err := io.ReadAll(contentResp.Body) + require.NoError(t, err) + assert.Equal(t, contents[except], content) }) t.Run("case insensitive", func(t *testing.T) { @@ -469,3 +482,112 @@ func uploadCacheNormally(t *testing.T, base, key, version string, content []byte assert.Equal(t, content, got) } } + +func TestHandler_gcCache(t *testing.T) { + dir := filepath.Join(t.TempDir(), "artifactcache") + handler, err := StartHandler(dir, "", 0, nil) + require.NoError(t, err) + + defer func() { + require.NoError(t, handler.Close()) + }() + + now := time.Now() + + cases := []struct { + Cache *Cache + Kept bool + }{ + { + // should be kept, since it's used recently and not too old. + Cache: &Cache{ + Key: "test_key_1", + Version: "test_version", + Complete: true, + UsedAt: now.Unix(), + CreatedAt: now.Add(-time.Hour).Unix(), + }, + Kept: true, + }, + { + // should be removed, since it's not complete and not used for a while. + Cache: &Cache{ + Key: "test_key_2", + Version: "test_version", + Complete: false, + UsedAt: now.Add(-(keepTemp + time.Second)).Unix(), + CreatedAt: now.Add(-(keepTemp + time.Hour)).Unix(), + }, + Kept: false, + }, + { + // should be removed, since it's not used for a while. + Cache: &Cache{ + Key: "test_key_3", + Version: "test_version", + Complete: true, + UsedAt: now.Add(-(keepUnused + time.Second)).Unix(), + CreatedAt: now.Add(-(keepUnused + time.Hour)).Unix(), + }, + Kept: false, + }, + { + // should be removed, since it's used but too old. + Cache: &Cache{ + Key: "test_key_3", + Version: "test_version", + Complete: true, + UsedAt: now.Unix(), + CreatedAt: now.Add(-(keepUsed + time.Second)).Unix(), + }, + Kept: false, + }, + { + // should be kept, since it has a newer edition but be used recently. + Cache: &Cache{ + Key: "test_key_1", + Version: "test_version", + Complete: true, + UsedAt: now.Add(-(keepOld - time.Minute)).Unix(), + CreatedAt: now.Add(-(time.Hour + time.Second)).Unix(), + }, + Kept: true, + }, + { + // should be removed, since it has a newer edition and not be used recently. + Cache: &Cache{ + Key: "test_key_1", + Version: "test_version", + Complete: true, + UsedAt: now.Add(-(keepOld + time.Second)).Unix(), + CreatedAt: now.Add(-(time.Hour + time.Second)).Unix(), + }, + Kept: false, + }, + } + + db, err := handler.openDB() + require.NoError(t, err) + for _, c := range cases { + require.NoError(t, insertCache(db, c.Cache)) + } + require.NoError(t, db.Close()) + + handler.gcAt = time.Time{} // ensure gcCache will not skip + handler.gcCache() + + db, err = handler.openDB() + require.NoError(t, err) + for i, v := range cases { + t.Run(fmt.Sprintf("%d_%s", i, v.Cache.Key), func(t *testing.T) { + cache := &Cache{} + err = db.Get(v.Cache.ID, cache) + if v.Kept { + assert.NoError(t, err) + } else { + assert.ErrorIs(t, err, bolthold.ErrNotFound) + } + }) + } + require.NoError(t, db.Close()) +} diff --git a/pkg/artifactcache/model.go b/pkg/artifactcache/model.go index 32b8ce5..57812b3 100644 --- a/pkg/artifactcache/model.go +++ b/pkg/artifactcache/model.go @@ -1,10 +1,5 @@ package artifactcache -import ( - "crypto/sha256" - "fmt" -) - type Request struct { Key string `json:"key" ` Version string `json:"version"` @@ -29,16 +24,11 @@ func (c *Request) ToCache() *Cache { } type Cache struct { - ID uint64 `json:"id" boltholdKey:"ID"` - Key string `json:"key" boltholdIndex:"Key"` - Version string `json:"version" boltholdIndex:"Version"` - KeyVersionHash string `json:"keyVersionHash" boltholdUnique:"KeyVersionHash"` - Size int64 `json:"cacheSize"` - Complete bool `json:"complete"` - UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"` - CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"` -} - -func (c *Cache) FillKeyVersionHash() { - c.KeyVersionHash = fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%s:%s", c.Key, c.Version)))) + ID uint64 `json:"id" boltholdKey:"ID"` + Key string `json:"key" boltholdIndex:"Key"` + Version string `json:"version" boltholdIndex:"Version"` + Size int64 `json:"cacheSize"` + Complete bool `json:"complete" boltholdIndex:"Complete"` + UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"` + CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"` }