package writecache import ( "sort" "time" "go.etcd.io/bbolt" "go.uber.org/zap" ) const defaultPersistInterval = time.Second // persistLoop persists object accumulated in memory to the database. func (c *cache) persistLoop() { tick := time.NewTicker(defaultPersistInterval) defer tick.Stop() for { select { case <-tick.C: c.mtx.RLock() m := c.mem c.mtx.RUnlock() sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) start := time.Now() c.persistObjects(m) c.log.Debug("persisted items to disk", zap.Duration("took", time.Since(start)), zap.Int("total", len(m))) c.mtx.Lock() c.curMemSize = 0 n := copy(c.mem, c.mem[len(m):]) c.mem = c.mem[:n] for i := range c.mem { c.curMemSize += uint64(len(c.mem[i].data)) } c.mtx.Unlock() case <-c.closeCh: return } } } func (c *cache) persistToCache(objs []objectInfo) []int { var ( failMem []int doneMem []int ) var sz uint64 err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) for i := range objs { if uint64(len(objs[i].data)) >= c.smallObjectSize { failMem = append(failMem, i) continue } err := b.Put([]byte(objs[i].addr), objs[i].data) if err != nil { return err } sz += uint64(len(objs[i].data)) doneMem = append(doneMem, i) } return nil }) if err == nil { c.dbSize.Add(sz) } if len(doneMem) > 0 { c.evictObjects(len(doneMem)) for _, i := range doneMem { c.flushed.Add(objs[i].addr, true) } } var failDisk []int for _, i := range failMem { if uint64(len(objs[i].data)) > c.maxObjectSize { failDisk = append(failDisk, i) continue } err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data) if err != nil { failDisk = append(failDisk, i) } } return failDisk } // persistObjects tries to write objects from memory to the persistent storage. // If tryCache is false, writing skips cache and is done directly to the main storage. func (c *cache) persistObjects(objs []objectInfo) { toDisk := c.persistToCache(objs) j := 0 for i := range objs { ch := c.metaCh if j < len(toDisk) { if i == toDisk[j] { ch = c.directCh } else { for ; j < len(toDisk) && i > toDisk[j]; j++ { } } } select { case ch <- objs[j].obj: case <-c.closeCh: return } } }