Merge pull request #1279 from restic/fix-eof-error
cache: Synchronize downloading
This commit is contained in:
commit
7e8bc8d362
3 changed files with 64 additions and 6 deletions
63
internal/cache/backend.go
vendored
63
internal/cache/backend.go
vendored
|
@ -3,6 +3,7 @@ package cache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
@ -12,11 +13,25 @@ import (
|
||||||
type Backend struct {
|
type Backend struct {
|
||||||
restic.Backend
|
restic.Backend
|
||||||
*Cache
|
*Cache
|
||||||
|
|
||||||
|
// inProgress contains the handle for all files that are currently
|
||||||
|
// downloaded. The channel in the value is closed as soon as the download
|
||||||
|
// is finished.
|
||||||
|
inProgressMutex sync.Mutex
|
||||||
|
inProgress map[restic.Handle]chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure cachedBackend implements restic.Backend
|
// ensure cachedBackend implements restic.Backend
|
||||||
var _ restic.Backend = &Backend{}
|
var _ restic.Backend = &Backend{}
|
||||||
|
|
||||||
|
func newBackend(be restic.Backend, c *Cache) *Backend {
|
||||||
|
return &Backend{
|
||||||
|
Backend: be,
|
||||||
|
Cache: c,
|
||||||
|
inProgress: make(map[restic.Handle]chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Remove deletes a file from the backend and the cache if it has been cached.
|
// Remove deletes a file from the backend and the cache if it has been cached.
|
||||||
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||||
debug.Log("cache Remove(%v)", h)
|
debug.Log("cache Remove(%v)", h)
|
||||||
|
@ -83,6 +98,30 @@ var autoCacheFiles = map[restic.FileType]bool{
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||||
|
finish := make(chan struct{})
|
||||||
|
defer func() {
|
||||||
|
close(finish)
|
||||||
|
|
||||||
|
// remove the finish channel from the map
|
||||||
|
b.inProgressMutex.Lock()
|
||||||
|
delete(b.inProgress, h)
|
||||||
|
b.inProgressMutex.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
b.inProgressMutex.Lock()
|
||||||
|
other, alreadyDownloading := b.inProgress[h]
|
||||||
|
if !alreadyDownloading {
|
||||||
|
b.inProgress[h] = finish
|
||||||
|
}
|
||||||
|
b.inProgressMutex.Unlock()
|
||||||
|
|
||||||
|
if alreadyDownloading {
|
||||||
|
debug.Log("readahead %v is already performed by somebody else, delegating...", h)
|
||||||
|
<-other
|
||||||
|
debug.Log("download %v finished", h)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
rd, err := b.Backend.Load(ctx, h, 0, 0)
|
rd, err := b.Backend.Load(ctx, h, 0, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -101,8 +140,29 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// loadFromCacheOrDelegate will try to load the file from the cache, and fall
|
||||||
|
// back to the backend if that fails.
|
||||||
|
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||||
|
rd, err := b.Cache.Load(h, length, offset)
|
||||||
|
if err == nil {
|
||||||
|
return rd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.Backend.Load(ctx, h, length, offset)
|
||||||
|
}
|
||||||
|
|
||||||
// Load loads a file from the cache or the backend.
|
// Load loads a file from the cache or the backend.
|
||||||
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||||
|
b.inProgressMutex.Lock()
|
||||||
|
waitForFinish, inProgress := b.inProgress[h]
|
||||||
|
b.inProgressMutex.Unlock()
|
||||||
|
|
||||||
|
if inProgress {
|
||||||
|
debug.Log("downloading %v is already in progress, waiting for finish", h)
|
||||||
|
<-waitForFinish
|
||||||
|
debug.Log("downloading %v finished", h)
|
||||||
|
}
|
||||||
|
|
||||||
if b.Cache.Has(h) {
|
if b.Cache.Has(h) {
|
||||||
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
|
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
|
||||||
rd, err := b.Cache.Load(h, length, offset)
|
rd, err := b.Cache.Load(h, length, offset)
|
||||||
|
@ -116,9 +176,10 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
||||||
if offset != 0 || length != 0 {
|
if offset != 0 || length != 0 {
|
||||||
if b.Cache.PerformReadahead(h) {
|
if b.Cache.PerformReadahead(h) {
|
||||||
debug.Log("performing readahead for %v", h)
|
debug.Log("performing readahead for %v", h)
|
||||||
|
|
||||||
err := b.cacheFile(ctx, h)
|
err := b.cacheFile(ctx, h)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return b.Cache.Load(h, length, offset)
|
return b.loadFromCacheOrDelegate(ctx, h, length, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("error caching %v: %v", h, err)
|
debug.Log("error caching %v: %v", h, err)
|
||||||
|
|
5
internal/cache/cache.go
vendored
5
internal/cache/cache.go
vendored
|
@ -155,10 +155,7 @@ func (c *Cache) IsNotExist(err error) bool {
|
||||||
|
|
||||||
// Wrap returns a backend with a cache.
|
// Wrap returns a backend with a cache.
|
||||||
func (c *Cache) Wrap(be restic.Backend) restic.Backend {
|
func (c *Cache) Wrap(be restic.Backend) restic.Backend {
|
||||||
return &Backend{
|
return newBackend(be, c)
|
||||||
Backend: be,
|
|
||||||
Cache: c,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BaseDir returns the base directory.
|
// BaseDir returns the base directory.
|
||||||
|
|
|
@ -163,7 +163,7 @@ func (sn *Snapshot) HasTagList(l []TagList) bool {
|
||||||
|
|
||||||
for _, tags := range l {
|
for _, tags := range l {
|
||||||
if sn.HasTags(tags) {
|
if sn.HasTags(tags) {
|
||||||
debug.Log(" snapshot satisfies %v", tags, l)
|
debug.Log(" snapshot satisfies %v %v", tags, l)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue