forked from TrueCloudLab/restic
dump: add GetOrCompute to bloblru cache
This commit is contained in:
parent
45509eafc8
commit
bd03af2feb
2 changed files with 51 additions and 11 deletions
|
@ -20,6 +20,7 @@ type Cache struct {
|
||||||
c *simplelru.LRU[restic.ID, []byte]
|
c *simplelru.LRU[restic.ID, []byte]
|
||||||
|
|
||||||
free, size int // Current and max capacity, in bytes.
|
free, size int // Current and max capacity, in bytes.
|
||||||
|
inProgress map[restic.ID]chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a blob cache that stores at most size bytes worth of blobs.
|
// New constructs a blob cache that stores at most size bytes worth of blobs.
|
||||||
|
@ -27,6 +28,7 @@ func New(size int) *Cache {
|
||||||
c := &Cache{
|
c := &Cache{
|
||||||
free: size,
|
free: size,
|
||||||
size: size,
|
size: size,
|
||||||
|
inProgress: make(map[restic.ID]chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLRU wants us to specify some max. number of entries, else it errors.
|
// NewLRU wants us to specify some max. number of entries, else it errors.
|
||||||
|
@ -85,6 +87,48 @@ func (c *Cache) Get(id restic.ID) ([]byte, bool) {
|
||||||
return blob, ok
|
return blob, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) GetOrCompute(id restic.ID, compute func() ([]byte, error)) ([]byte, error) {
|
||||||
|
// check if already cached
|
||||||
|
blob, ok := c.Get(id)
|
||||||
|
if ok {
|
||||||
|
return blob, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for parallel download or start our own
|
||||||
|
finish := make(chan struct{})
|
||||||
|
c.mu.Lock()
|
||||||
|
waitForResult, isDownloading := c.inProgress[id]
|
||||||
|
if !isDownloading {
|
||||||
|
c.inProgress[id] = finish
|
||||||
|
|
||||||
|
// remove progress channel once finished here
|
||||||
|
defer func() {
|
||||||
|
c.mu.Lock()
|
||||||
|
delete(c.inProgress, id)
|
||||||
|
c.mu.Unlock()
|
||||||
|
close(finish)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
if isDownloading {
|
||||||
|
// wait for result of parallel download
|
||||||
|
<-waitForResult
|
||||||
|
blob, ok := c.Get(id)
|
||||||
|
if ok {
|
||||||
|
return blob, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// download it
|
||||||
|
blob, err := compute()
|
||||||
|
if err == nil {
|
||||||
|
c.Add(id, blob)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blob, err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) evict(key restic.ID, blob []byte) {
|
func (c *Cache) evict(key restic.ID, blob []byte) {
|
||||||
debug.Log("bloblru.Cache: evict %v, %d bytes", key, cap(blob))
|
debug.Log("bloblru.Cache: evict %v, %d bytes", key, cap(blob))
|
||||||
c.free += cap(blob) + overhead
|
c.free += cap(blob) + overhead
|
||||||
|
|
|
@ -143,17 +143,13 @@ func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node)
|
||||||
for i := uint(0); i < d.repo.Connections(); i++ {
|
for i := uint(0); i < d.repo.Connections(); i++ {
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
for task := range loaderCh {
|
for task := range loaderCh {
|
||||||
var err error
|
blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) {
|
||||||
blob, ok := d.cache.Get(task.id)
|
return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil)
|
||||||
if !ok {
|
})
|
||||||
blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
d.cache.Add(task.id, blob)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case task.out <- blob:
|
case task.out <- blob:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
Loading…
Reference in a new issue