Merge pull request #2838 from greatroar/cache-conflicts
Make cache concurrency-safe
This commit is contained in:
commit
553ea36ca6
6 changed files with 121 additions and 99 deletions
12
changelog/unreleased/issue-2345
Normal file
12
changelog/unreleased/issue-2345
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
Bugfix: Make cache crash-resistant and usable by multiple concurrent processes
|
||||||
|
|
||||||
|
The restic cache directory ($RESTIC_CACHE_DIR) could end up in a broken state
|
||||||
|
in the event of restic (or the OS) crashing. This is now less likely to occur
|
||||||
|
as files are downloaded to a temporary location before being moved in place.
|
||||||
|
|
||||||
|
This also allows multiple concurrent restic processes to operate on a single
|
||||||
|
repository without conflicts. Previously, concurrent operations could cause
|
||||||
|
segfaults because the processes saw each other's partially downloaded files.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/issues/2345
|
||||||
|
https://github.com/restic/restic/pull/2838
|
14
internal/cache/backend.go
vendored
14
internal/cache/backend.go
vendored
|
@ -43,14 +43,13 @@ func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||||
return b.Cache.remove(h)
|
return b.Cache.remove(h)
|
||||||
}
|
}
|
||||||
|
|
||||||
var autoCacheTypes = map[restic.FileType]struct{}{
|
func autoCached(t restic.FileType) bool {
|
||||||
restic.IndexFile: {},
|
return t == restic.IndexFile || t == restic.SnapshotFile
|
||||||
restic.SnapshotFile: {},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save stores a new file in the backend and the cache.
|
// Save stores a new file in the backend and the cache.
|
||||||
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||||
if _, ok := autoCacheTypes[h.Type]; !ok {
|
if !autoCached(h.Type) {
|
||||||
return b.Backend.Save(ctx, h, rd)
|
return b.Backend.Save(ctx, h, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,11 +83,6 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var autoCacheFiles = map[restic.FileType]bool{
|
|
||||||
restic.IndexFile: true,
|
|
||||||
restic.SnapshotFile: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||||
finish := make(chan struct{})
|
finish := make(chan struct{})
|
||||||
|
|
||||||
|
@ -192,7 +186,7 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we don't automatically cache this file type, fall back to the backend
|
// if we don't automatically cache this file type, fall back to the backend
|
||||||
if _, ok := autoCacheFiles[h.Type]; !ok {
|
if !autoCached(h.Type) {
|
||||||
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
|
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
|
||||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||||
}
|
}
|
||||||
|
|
15
internal/cache/cache.go
vendored
15
internal/cache/cache.go
vendored
|
@ -28,7 +28,7 @@ const fileMode = 0644
|
||||||
|
|
||||||
func readVersion(dir string) (v uint, err error) {
|
func readVersion(dir string) (v uint, err error) {
|
||||||
buf, err := ioutil.ReadFile(filepath.Join(dir, "version"))
|
buf, err := ioutil.ReadFile(filepath.Join(dir, "version"))
|
||||||
if os.IsNotExist(err) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,13 +61,13 @@ func writeCachedirTag(dir string) error {
|
||||||
|
|
||||||
tagfile := filepath.Join(dir, "CACHEDIR.TAG")
|
tagfile := filepath.Join(dir, "CACHEDIR.TAG")
|
||||||
_, err := fs.Lstat(tagfile)
|
_, err := fs.Lstat(tagfile)
|
||||||
if err != nil && !os.IsNotExist(err) {
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := fs.OpenFile(tagfile, os.O_CREATE|os.O_EXCL|os.O_WRONLY, fileMode)
|
f, err := fs.OpenFile(tagfile, os.O_CREATE|os.O_EXCL|os.O_WRONLY, fileMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsExist(errors.Cause(err)) {
|
if errors.Is(err, os.ErrExist) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ func New(id string, basedir string) (c *Cache, err error) {
|
||||||
// create the repo cache dir if it does not exist yet
|
// create the repo cache dir if it does not exist yet
|
||||||
var created bool
|
var created bool
|
||||||
_, err = fs.Lstat(cachedir)
|
_, err = fs.Lstat(cachedir)
|
||||||
if os.IsNotExist(err) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
err = fs.MkdirAll(cachedir, dirMode)
|
err = fs.MkdirAll(cachedir, dirMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.WithStack(err)
|
return nil, errors.WithStack(err)
|
||||||
|
@ -179,11 +179,10 @@ func validCacheDirName(s string) bool {
|
||||||
// listCacheDirs returns the list of cache directories.
|
// listCacheDirs returns the list of cache directories.
|
||||||
func listCacheDirs(basedir string) ([]os.FileInfo, error) {
|
func listCacheDirs(basedir string) ([]os.FileInfo, error) {
|
||||||
f, err := fs.Open(basedir)
|
f, err := fs.Open(basedir)
|
||||||
if err != nil && os.IsNotExist(errors.Cause(err)) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
56
internal/cache/file.go
vendored
56
internal/cache/file.go
vendored
|
@ -2,8 +2,10 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/restic/restic/internal/crypto"
|
"github.com/restic/restic/internal/crypto"
|
||||||
|
@ -84,31 +86,26 @@ func (c *Cache) load(h restic.Handle, length int, offset int64) (io.ReadCloser,
|
||||||
return rd, nil
|
return rd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveWriter returns a writer for the cache object h. It must be closed after writing is finished.
|
|
||||||
func (c *Cache) saveWriter(h restic.Handle) (io.WriteCloser, error) {
|
|
||||||
debug.Log("Save to cache: %v", h)
|
|
||||||
if !c.canBeCached(h.Type) {
|
|
||||||
return nil, errors.New("cannot be cached")
|
|
||||||
}
|
|
||||||
|
|
||||||
p := c.filename(h)
|
|
||||||
err := fs.MkdirAll(filepath.Dir(p), 0700)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := fs.OpenFile(p, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0400)
|
|
||||||
return f, errors.WithStack(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save saves a file in the cache.
|
// Save saves a file in the cache.
|
||||||
func (c *Cache) Save(h restic.Handle, rd io.Reader) error {
|
func (c *Cache) Save(h restic.Handle, rd io.Reader) error {
|
||||||
debug.Log("Save to cache: %v", h)
|
debug.Log("Save to cache: %v", h)
|
||||||
if rd == nil {
|
if rd == nil {
|
||||||
return errors.New("Save() called with nil reader")
|
return errors.New("Save() called with nil reader")
|
||||||
}
|
}
|
||||||
|
if !c.canBeCached(h.Type) {
|
||||||
|
return errors.New("cannot be cached")
|
||||||
|
}
|
||||||
|
|
||||||
f, err := c.saveWriter(h)
|
finalname := c.filename(h)
|
||||||
|
dir := filepath.Dir(finalname)
|
||||||
|
err := fs.Mkdir(dir, 0700)
|
||||||
|
if err != nil && !errors.Is(err, os.ErrExist) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// First save to a temporary location. This allows multiple concurrent
|
||||||
|
// restics to use a single cache dir.
|
||||||
|
f, err := ioutil.TempFile(dir, "tmp-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -116,23 +113,38 @@ func (c *Cache) Save(h restic.Handle, rd io.Reader) error {
|
||||||
n, err := io.Copy(f, rd)
|
n, err := io.Copy(f, rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
_ = c.remove(h)
|
_ = fs.Remove(f.Name())
|
||||||
return errors.Wrap(err, "Copy")
|
return errors.Wrap(err, "Copy")
|
||||||
}
|
}
|
||||||
|
|
||||||
if n <= crypto.Extension {
|
if n <= crypto.Extension {
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
_ = c.remove(h)
|
_ = fs.Remove(f.Name())
|
||||||
debug.Log("trying to cache truncated file %v, removing", h)
|
debug.Log("trying to cache truncated file %v, removing", h)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close, then rename. Windows doesn't like the reverse order.
|
||||||
if err = f.Close(); err != nil {
|
if err = f.Close(); err != nil {
|
||||||
_ = c.remove(h)
|
_ = fs.Remove(f.Name())
|
||||||
return errors.WithStack(err)
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
err = fs.Rename(f.Name(), finalname)
|
||||||
|
if err != nil {
|
||||||
|
_ = fs.Remove(f.Name())
|
||||||
|
}
|
||||||
|
if runtime.GOOS == "windows" && errors.Is(err, os.ErrPermission) {
|
||||||
|
// On Windows, renaming over an existing file is ok
|
||||||
|
// (os.Rename is MoveFileExW with MOVEFILE_REPLACE_EXISTING
|
||||||
|
// since Go 1.5), but not when someone else has the file open.
|
||||||
|
//
|
||||||
|
// When we get Access denied, we assume that's the case
|
||||||
|
// and the other process has written the desired contents to f.
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.WithStack(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove deletes a file. When the file is not cache, no error is returned.
|
// Remove deletes a file. When the file is not cache, no error is returned.
|
||||||
|
|
115
internal/cache/file_test.go
vendored
115
internal/cache/file_test.go
vendored
|
@ -3,14 +3,17 @@ package cache
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
"github.com/restic/restic/internal/test"
|
"github.com/restic/restic/internal/test"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func generateRandomFiles(t testing.TB, tpe restic.FileType, c *Cache) restic.IDSet {
|
func generateRandomFiles(t testing.TB, tpe restic.FileType, c *Cache) restic.IDSet {
|
||||||
|
@ -131,64 +134,6 @@ func TestFiles(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileSaveWriter(t *testing.T) {
|
|
||||||
seed := time.Now().Unix()
|
|
||||||
t.Logf("seed is %v", seed)
|
|
||||||
rand.Seed(seed)
|
|
||||||
|
|
||||||
c, cleanup := TestNewCache(t)
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
// save about 5 MiB of data in the cache
|
|
||||||
data := test.Random(rand.Int(), 5234142)
|
|
||||||
id := restic.ID{}
|
|
||||||
copy(id[:], data)
|
|
||||||
h := restic.Handle{
|
|
||||||
Type: restic.PackFile,
|
|
||||||
Name: id.String(),
|
|
||||||
}
|
|
||||||
|
|
||||||
wr, err := c.saveWriter(h)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := io.Copy(wr, bytes.NewReader(data))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if n != int64(len(data)) {
|
|
||||||
t.Fatalf("wrong number of bytes written, want %v, got %v", len(data), n)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = wr.Close(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rd, err := c.load(h, 0, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
buf, err := ioutil.ReadAll(rd)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(buf) != len(data) {
|
|
||||||
t.Fatalf("wrong number of bytes read, want %v, got %v", len(data), len(buf))
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(buf, data) {
|
|
||||||
t.Fatalf("wrong data returned, want:\n %02x\ngot:\n %02x", data[:16], buf[:16])
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = rd.Close(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFileLoad(t *testing.T) {
|
func TestFileLoad(t *testing.T) {
|
||||||
seed := time.Now().Unix()
|
seed := time.Now().Unix()
|
||||||
t.Logf("seed is %v", seed)
|
t.Logf("seed is %v", seed)
|
||||||
|
@ -257,3 +202,55 @@ func TestFileLoad(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Simulate multiple processes writing to a cache, using goroutines.
|
||||||
|
func TestFileSaveConcurrent(t *testing.T) {
|
||||||
|
const nproc = 40
|
||||||
|
|
||||||
|
c, cleanup := TestNewCache(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
var (
|
||||||
|
data = test.Random(1, 10000)
|
||||||
|
g errgroup.Group
|
||||||
|
id restic.ID
|
||||||
|
)
|
||||||
|
rand.Read(id[:])
|
||||||
|
|
||||||
|
h := restic.Handle{
|
||||||
|
Type: restic.PackFile,
|
||||||
|
Name: id.String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < nproc/2; i++ {
|
||||||
|
g.Go(func() error { return c.Save(h, bytes.NewReader(data)) })
|
||||||
|
|
||||||
|
// Can't use load because only the main goroutine may call t.Fatal.
|
||||||
|
g.Go(func() error {
|
||||||
|
// The timing is hard to get right, but the main thing we want to
|
||||||
|
// ensure is ENOENT or nil error.
|
||||||
|
time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
|
||||||
|
|
||||||
|
f, err := c.load(h, 0, 0)
|
||||||
|
t.Logf("Load error: %v", err)
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
case errors.Is(err, os.ErrNotExist):
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = f.Close() }()
|
||||||
|
|
||||||
|
read, err := ioutil.ReadAll(f)
|
||||||
|
if err == nil && !bytes.Equal(read, data) {
|
||||||
|
err = errors.New("mismatch between Save and Load")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
test.OK(t, g.Wait())
|
||||||
|
saved := load(t, c, h)
|
||||||
|
test.Equals(t, data, saved)
|
||||||
|
}
|
||||||
|
|
|
@ -40,6 +40,14 @@ func RemoveAll(path string) error {
|
||||||
return os.RemoveAll(fixpath(path))
|
return os.RemoveAll(fixpath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rename renames (moves) oldpath to newpath.
|
||||||
|
// If newpath already exists, Rename replaces it.
|
||||||
|
// OS-specific restrictions may apply when oldpath and newpath are in different directories.
|
||||||
|
// If there is an error, it will be of type *LinkError.
|
||||||
|
func Rename(oldpath, newpath string) error {
|
||||||
|
return os.Rename(fixpath(oldpath), fixpath(newpath))
|
||||||
|
}
|
||||||
|
|
||||||
// Symlink creates newname as a symbolic link to oldname.
|
// Symlink creates newname as a symbolic link to oldname.
|
||||||
// If there is an error, it will be of type *LinkError.
|
// If there is an error, it will be of type *LinkError.
|
||||||
func Symlink(oldname, newname string) error {
|
func Symlink(oldname, newname string) error {
|
||||||
|
|
Loading…
Reference in a new issue