Merge pull request #4709 from MichaelEischer/refactor-locking

Refactor locking into repository package
This commit is contained in:
Michael Eischer 2024-03-28 23:53:09 +01:00 committed by GitHub
commit 510f6f06b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 584 additions and 786 deletions

View file

@ -0,0 +1,10 @@
Bugfix: Correct `--no-lock` handling of `ls` and `tag` command
The `ls` command never locked the repository. This has been fixed. The old
behavior is still supported using `ls --no-lock`. The latter invocation also
works with older restic versions.
The `tag` command erroneously accepted the `--no-lock` command. The command
now always requires an exclusive lock.
https://github.com/restic/restic/pull/4709

View file

@ -463,10 +463,11 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
Verbosef("open repository\n") Verbosef("open repository\n")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithAppendLock(ctx, gopts, opts.DryRun)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
var progressPrinter backup.ProgressPrinter var progressPrinter backup.ProgressPrinter
if gopts.JSON { if gopts.JSON {
@ -478,22 +479,6 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
calculateProgressInterval(!gopts.Quiet, gopts.JSON)) calculateProgressInterval(!gopts.Quiet, gopts.JSON))
defer progressReporter.Done() defer progressReporter.Done()
if opts.DryRun {
repo.SetDryRun()
}
if !gopts.JSON {
progressPrinter.V("lock repository")
}
if !opts.DryRun {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
// rejectByNameFuncs collect functions that can reject items from the backup based on path only // rejectByNameFuncs collect functions that can reject items from the backup based on path only
rejectByNameFuncs, err := collectRejectByNameFuncs(opts, repo) rejectByNameFuncs, err := collectRejectByNameFuncs(opts, repo)
if err != nil { if err != nil {

View file

@ -9,7 +9,6 @@ import (
"runtime" "runtime"
"testing" "testing"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
@ -250,29 +249,18 @@ func TestBackupTreeLoadError(t *testing.T) {
opts := BackupOptions{} opts := BackupOptions{}
// Backup a subdirectory first, such that we can remove the tree pack for the subdirectory // Backup a subdirectory first, such that we can remove the tree pack for the subdirectory
testRunBackup(t, env.testdata, []string{"test"}, opts, env.gopts) testRunBackup(t, env.testdata, []string{"test"}, opts, env.gopts)
treePacks := listTreePacks(env.gopts, t)
r, err := OpenRepository(context.TODO(), env.gopts)
rtest.OK(t, err)
rtest.OK(t, r.LoadIndex(context.TODO(), nil))
treePacks := restic.NewIDSet()
r.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
testRunBackup(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts) testRunBackup(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts)
testRunCheck(t, env.gopts) testRunCheck(t, env.gopts)
// delete the subdirectory pack first // delete the subdirectory pack first
for id := range treePacks { removePacks(env.gopts, t, treePacks)
rtest.OK(t, r.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()}))
}
testRunRebuildIndex(t, env.gopts) testRunRebuildIndex(t, env.gopts)
// now the repo is missing the tree blob in the index; check should report this // now the repo is missing the tree blob in the index; check should report this
testRunCheckMustFail(t, env.gopts) testRunCheckMustFail(t, env.gopts)
// second backup should report an error but "heal" this situation // second backup should report an error but "heal" this situation
err = testRunBackupAssumeFailure(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts) err := testRunBackupAssumeFailure(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts)
rtest.Assert(t, err != nil, "backup should have reported an error for the subdirectory") rtest.Assert(t, err != nil, "backup should have reported an error for the subdirectory")
testRunCheck(t, env.gopts) testRunCheck(t, env.gopts)

View file

@ -64,19 +64,11 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error {
return err return err
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
tpe := args[0] tpe := args[0]

View file

@ -204,20 +204,14 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
return code, nil return code, nil
}) })
repo, err := OpenRepository(ctx, gopts)
if err != nil {
return err
}
if !gopts.NoLock { if !gopts.NoLock {
Verbosef("create exclusive lock for repository\n") Verbosef("create exclusive lock for repository\n")
var lock *restic.Lock }
lock, ctx, err = lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, gopts.NoLock)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
} defer unlock()
chkr := checker.New(repo, opts.CheckUnused) chkr := checker.New(repo, opts.CheckUnused)
err = chkr.LoadSnapshots(ctx) err = chkr.LoadSnapshots(ctx)

View file

@ -62,30 +62,17 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
gopts, secondaryGopts = secondaryGopts, gopts gopts, secondaryGopts = secondaryGopts, gopts
} }
srcRepo, err := OpenRepository(ctx, gopts) ctx, srcRepo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
dstRepo, err := OpenRepository(ctx, secondaryGopts) ctx, dstRepo, unlock, err := openWithAppendLock(ctx, secondaryGopts, false)
if err != nil {
return err
}
if !gopts.NoLock {
var srcLock *restic.Lock
srcLock, ctx, err = lockRepo(ctx, srcRepo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(srcLock)
if err != nil {
return err
}
}
dstLock, ctx, err := lockRepo(ctx, dstRepo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(dstLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
srcSnapshotLister, err := restic.MemorizeList(ctx, srcRepo, restic.SnapshotFile) srcSnapshotLister, err := restic.MemorizeList(ctx, srcRepo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -153,19 +153,11 @@ func runDebugDump(ctx context.Context, gopts GlobalOptions, args []string) error
return errors.Fatal("type not specified") return errors.Fatal("type not specified")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
tpe := args[0] tpe := args[0]
@ -442,10 +434,15 @@ func storePlainBlob(id restic.ID, prefix string, plain []byte) error {
} }
func runDebugExamine(ctx context.Context, gopts GlobalOptions, opts DebugExamineOptions, args []string) error { func runDebugExamine(ctx context.Context, gopts GlobalOptions, opts DebugExamineOptions, args []string) error {
repo, err := OpenRepository(ctx, gopts) if opts.ExtractPack && gopts.NoLock {
return fmt.Errorf("--extract-pack and --no-lock are mutually exclusive")
}
ctx, repo, unlock, err := openWithAppendLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
ids := make([]restic.ID, 0) ids := make([]restic.ID, 0)
for _, name := range args { for _, name := range args {
@ -464,15 +461,6 @@ func runDebugExamine(ctx context.Context, gopts GlobalOptions, opts DebugExamine
return errors.Fatal("no pack files to examine") return errors.Fatal("no pack files to examine")
} }
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
bar := newIndexProgress(gopts.Quiet, gopts.JSON) bar := newIndexProgress(gopts.Quiet, gopts.JSON)
err = repo.LoadIndex(ctx, bar) err = repo.LoadIndex(ctx, bar)
if err != nil { if err != nil {

View file

@ -344,19 +344,11 @@ func runDiff(ctx context.Context, opts DiffOptions, gopts GlobalOptions, args []
return errors.Fatalf("specify two snapshot IDs") return errors.Fatalf("specify two snapshot IDs")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
// cache snapshots listing // cache snapshots listing
be, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) be, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)

View file

@ -131,19 +131,11 @@ func runDump(ctx context.Context, opts DumpOptions, gopts GlobalOptions, args []
splittedPath := splitPath(path.Clean(pathToPrint)) splittedPath := splitPath(path.Clean(pathToPrint))
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
sn, subfolder, err := (&restic.SnapshotFilter{ sn, subfolder, err := (&restic.SnapshotFilter{
Hosts: opts.Hosts, Hosts: opts.Hosts,

View file

@ -563,19 +563,11 @@ func runFind(ctx context.Context, opts FindOptions, gopts GlobalOptions, args []
return errors.Fatal("cannot have several ID types") return errors.Fatal("cannot have several ID types")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -163,23 +163,15 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
return err return err
} }
repo, err := OpenRepository(ctx, gopts)
if err != nil {
return err
}
if gopts.NoLock && !opts.DryRun { if gopts.NoLock && !opts.DryRun {
return errors.Fatal("--no-lock is only applicable in combination with --dry-run for forget command") return errors.Fatal("--no-lock is only applicable in combination with --dry-run for forget command")
} }
if !opts.DryRun || !gopts.NoLock { ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, opts.DryRun && gopts.NoLock)
var lock *restic.Lock
lock, ctx, err = lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
} defer unlock()
var snapshots restic.Snapshots var snapshots restic.Snapshots
removeSnIDs := restic.NewIDSet() removeSnIDs := restic.NewIDSet()

View file

@ -50,16 +50,11 @@ func runKeyAdd(ctx context.Context, gopts GlobalOptions, opts KeyAddOptions, arg
return fmt.Errorf("the key add command expects no arguments, only options - please see `restic help key add` for usage and flags") return fmt.Errorf("the key add command expects no arguments, only options - please see `restic help key add` for usage and flags")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithAppendLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
return addKey(ctx, repo, gopts, opts) return addKey(ctx, repo, gopts, opts)
} }

View file

@ -40,19 +40,11 @@ func runKeyList(ctx context.Context, gopts GlobalOptions, args []string) error {
return fmt.Errorf("the key list command expects no arguments, only options - please see `restic help key list` for usage and flags") return fmt.Errorf("the key list command expects no arguments, only options - please see `restic help key list` for usage and flags")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
return listKeys(ctx, repo, gopts) return listKeys(ctx, repo, gopts)
} }

View file

@ -47,16 +47,11 @@ func runKeyPasswd(ctx context.Context, gopts GlobalOptions, opts KeyPasswdOption
return fmt.Errorf("the key passwd command expects no arguments, only options - please see `restic help key passwd` for usage and flags") return fmt.Errorf("the key passwd command expects no arguments, only options - please see `restic help key passwd` for usage and flags")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
return changePassword(ctx, repo, gopts, opts) return changePassword(ctx, repo, gopts, opts)
} }

View file

@ -37,20 +37,13 @@ func runKeyRemove(ctx context.Context, gopts GlobalOptions, args []string) error
return fmt.Errorf("key remove expects one argument as the key id") return fmt.Errorf("key remove expects one argument as the key id")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON) return deleteKey(ctx, repo, args[0])
defer unlockRepo(lock)
if err != nil {
return err
}
idPrefix := args[0]
return deleteKey(ctx, repo, idPrefix)
} }
func deleteKey(ctx context.Context, repo *repository.Repository, idPrefix string) error { func deleteKey(ctx context.Context, repo *repository.Repository, idPrefix string) error {

View file

@ -36,19 +36,11 @@ func runList(ctx context.Context, gopts GlobalOptions, args []string) error {
return errors.Fatal("type not specified") return errors.Fatal("type not specified")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock || args[0] == "locks")
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock && args[0] != "locks" {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
var t restic.FileType var t restic.FileType
switch args[0] { switch args[0] {

View file

@ -309,10 +309,11 @@ func runLs(ctx context.Context, opts LsOptions, gopts GlobalOptions, args []stri
return false return false
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -117,16 +117,11 @@ func applyMigrations(ctx context.Context, opts MigrateOptions, gopts GlobalOptio
} }
func runMigrate(ctx context.Context, opts MigrateOptions, gopts GlobalOptions, args []string) error { func runMigrate(ctx context.Context, opts MigrateOptions, gopts GlobalOptions, args []string) error {
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if len(args) == 0 { if len(args) == 0 {
return checkMigrations(ctx, repo) return checkMigrations(ctx, repo)

View file

@ -125,19 +125,11 @@ func runMount(ctx context.Context, opts MountOptions, gopts GlobalOptions, args
debug.Log("start mount") debug.Log("start mount")
defer debug.Log("finish mount") defer debug.Log("finish mount")
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
bar := newIndexProgress(gopts.Quiet, gopts.JSON) bar := newIndexProgress(gopts.Quiet, gopts.JSON)
err = repo.LoadIndex(ctx, bar) err = repo.LoadIndex(ctx, bar)

View file

@ -12,7 +12,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
) )
@ -86,12 +85,12 @@ func listSnapshots(t testing.TB, dir string) []string {
return names return names
} }
func checkSnapshots(t testing.TB, global GlobalOptions, repo *repository.Repository, mountpoint, repodir string, snapshotIDs restic.IDs, expectedSnapshotsInFuseDir int) { func checkSnapshots(t testing.TB, gopts GlobalOptions, mountpoint string, snapshotIDs restic.IDs, expectedSnapshotsInFuseDir int) {
t.Logf("checking for %d snapshots: %v", len(snapshotIDs), snapshotIDs) t.Logf("checking for %d snapshots: %v", len(snapshotIDs), snapshotIDs)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go testRunMount(t, global, mountpoint, &wg) go testRunMount(t, gopts, mountpoint, &wg)
waitForMount(t, mountpoint) waitForMount(t, mountpoint)
defer wg.Wait() defer wg.Wait()
defer testRunUmount(t, mountpoint) defer testRunUmount(t, mountpoint)
@ -100,7 +99,7 @@ func checkSnapshots(t testing.TB, global GlobalOptions, repo *repository.Reposit
t.Fatal(`virtual directory "snapshots" doesn't exist`) t.Fatal(`virtual directory "snapshots" doesn't exist`)
} }
ids := listSnapshots(t, repodir) ids := listSnapshots(t, gopts.Repo)
t.Logf("found %v snapshots in repo: %v", len(ids), ids) t.Logf("found %v snapshots in repo: %v", len(ids), ids)
namesInSnapshots := listSnapshots(t, mountpoint) namesInSnapshots := listSnapshots(t, mountpoint)
@ -124,6 +123,10 @@ func checkSnapshots(t testing.TB, global GlobalOptions, repo *repository.Reposit
} }
} }
_, repo, unlock, err := openWithReadLock(context.TODO(), gopts, false)
rtest.OK(t, err)
defer unlock()
for _, id := range snapshotIDs { for _, id := range snapshotIDs {
snapshot, err := restic.LoadSnapshot(context.TODO(), repo, id) snapshot, err := restic.LoadSnapshot(context.TODO(), repo, id)
rtest.OK(t, err) rtest.OK(t, err)
@ -166,10 +169,7 @@ func TestMount(t *testing.T) {
testRunInit(t, env.gopts) testRunInit(t, env.gopts)
repo, err := OpenRepository(context.TODO(), env.gopts) checkSnapshots(t, env.gopts, env.mountpoint, []restic.ID{}, 0)
rtest.OK(t, err)
checkSnapshots(t, env.gopts, repo, env.mountpoint, env.repo, []restic.ID{}, 0)
rtest.SetupTarTestFixture(t, env.testdata, filepath.Join("testdata", "backup-data.tar.gz")) rtest.SetupTarTestFixture(t, env.testdata, filepath.Join("testdata", "backup-data.tar.gz"))
@ -179,7 +179,7 @@ func TestMount(t *testing.T) {
rtest.Assert(t, len(snapshotIDs) == 1, rtest.Assert(t, len(snapshotIDs) == 1,
"expected one snapshot, got %v", snapshotIDs) "expected one snapshot, got %v", snapshotIDs)
checkSnapshots(t, env.gopts, repo, env.mountpoint, env.repo, snapshotIDs, 2) checkSnapshots(t, env.gopts, env.mountpoint, snapshotIDs, 2)
// second backup, implicit incremental // second backup, implicit incremental
testRunBackup(t, "", []string{env.testdata}, BackupOptions{}, env.gopts) testRunBackup(t, "", []string{env.testdata}, BackupOptions{}, env.gopts)
@ -187,7 +187,7 @@ func TestMount(t *testing.T) {
rtest.Assert(t, len(snapshotIDs) == 2, rtest.Assert(t, len(snapshotIDs) == 2,
"expected two snapshots, got %v", snapshotIDs) "expected two snapshots, got %v", snapshotIDs)
checkSnapshots(t, env.gopts, repo, env.mountpoint, env.repo, snapshotIDs, 3) checkSnapshots(t, env.gopts, env.mountpoint, snapshotIDs, 3)
// third backup, explicit incremental // third backup, explicit incremental
bopts := BackupOptions{Parent: snapshotIDs[0].String()} bopts := BackupOptions{Parent: snapshotIDs[0].String()}
@ -196,7 +196,7 @@ func TestMount(t *testing.T) {
rtest.Assert(t, len(snapshotIDs) == 3, rtest.Assert(t, len(snapshotIDs) == 3,
"expected three snapshots, got %v", snapshotIDs) "expected three snapshots, got %v", snapshotIDs)
checkSnapshots(t, env.gopts, repo, env.mountpoint, env.repo, snapshotIDs, 4) checkSnapshots(t, env.gopts, env.mountpoint, snapshotIDs, 4)
} }
func TestMountSameTimestamps(t *testing.T) { func TestMountSameTimestamps(t *testing.T) {
@ -211,14 +211,11 @@ func TestMountSameTimestamps(t *testing.T) {
rtest.SetupTarTestFixture(t, env.base, filepath.Join("testdata", "repo-same-timestamps.tar.gz")) rtest.SetupTarTestFixture(t, env.base, filepath.Join("testdata", "repo-same-timestamps.tar.gz"))
repo, err := OpenRepository(context.TODO(), env.gopts)
rtest.OK(t, err)
ids := []restic.ID{ ids := []restic.ID{
restic.TestParseID("280303689e5027328889a06d718b729e96a1ce6ae9ef8290bff550459ae611ee"), restic.TestParseID("280303689e5027328889a06d718b729e96a1ce6ae9ef8290bff550459ae611ee"),
restic.TestParseID("75ad6cdc0868e082f2596d5ab8705e9f7d87316f5bf5690385eeff8dbe49d9f5"), restic.TestParseID("75ad6cdc0868e082f2596d5ab8705e9f7d87316f5bf5690385eeff8dbe49d9f5"),
restic.TestParseID("5fd0d8b2ef0fa5d23e58f1e460188abb0f525c0f0c4af8365a1280c807a80a1b"), restic.TestParseID("5fd0d8b2ef0fa5d23e58f1e460188abb0f525c0f0c4af8365a1280c807a80a1b"),
} }
checkSnapshots(t, env.gopts, repo, env.mountpoint, env.repo, ids, 4) checkSnapshots(t, env.gopts, env.mountpoint, ids, 4)
} }

View file

@ -148,10 +148,11 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions) error
return errors.Fatal("disabled compression and `--repack-uncompressed` are mutually exclusive") return errors.Fatal("disabled compression and `--repack-uncompressed` are mutually exclusive")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if repo.Connections() < 2 { if repo.Connections() < 2 {
return errors.Fatal("prune requires a backend connection limit of at least two") return errors.Fatal("prune requires a backend connection limit of at least two")
@ -169,12 +170,6 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions) error
opts.unsafeRecovery = true opts.unsafeRecovery = true
} }
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
return runPruneWithRepo(ctx, opts, gopts, repo, restic.NewIDSet()) return runPruneWithRepo(ctx, opts, gopts, repo, restic.NewIDSet())
} }

View file

@ -40,16 +40,11 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
return err return err
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithAppendLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -56,16 +56,11 @@ func init() {
} }
func runRebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOptions) error { func runRebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOptions) error {
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
return rebuildIndex(ctx, opts, gopts, repo) return rebuildIndex(ctx, opts, gopts, repo)
} }

View file

@ -52,16 +52,11 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.T
return errors.Fatal("no ids specified") return errors.Fatal("no ids specified")
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
if err != nil {
return err
}
lock, ctx, err := lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
bar := newIndexProgress(gopts.Quiet, gopts.JSON) bar := newIndexProgress(gopts.Quiet, gopts.JSON)
err = repo.LoadIndex(ctx, bar) err = repo.LoadIndex(ctx, bar)

View file

@ -66,22 +66,11 @@ func init() {
} }
func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOptions, args []string) error { func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOptions, args []string) error {
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, opts.DryRun)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !opts.DryRun {
var lock *restic.Lock
var err error
lock, ctx, err = lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
} else {
repo.SetDryRun()
}
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -127,19 +127,11 @@ func runRestore(ctx context.Context, opts RestoreOptions, gopts GlobalOptions,
debug.Log("restore %v to %v", snapshotIDString, opts.Target) debug.Log("restore %v to %v", snapshotIDString, opts.Target)
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
sn, subfolder, err := (&restic.SnapshotFilter{ sn, subfolder, err := (&restic.SnapshotFilter{
Hosts: opts.Hosts, Hosts: opts.Hosts,

View file

@ -256,27 +256,22 @@ func runRewrite(ctx context.Context, opts RewriteOptions, gopts GlobalOptions, a
return errors.Fatal("Nothing to do: no excludes provided and no new metadata provided") return errors.Fatal("Nothing to do: no excludes provided and no new metadata provided")
} }
repo, err := OpenRepository(ctx, gopts) var (
if err != nil { repo *repository.Repository
return err unlock func()
} err error
)
if !opts.DryRun {
var lock *restic.Lock
var err error
if opts.Forget { if opts.Forget {
Verbosef("create exclusive lock for repository\n") Verbosef("create exclusive lock for repository\n")
lock, ctx, err = lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON) ctx, repo, unlock, err = openWithExclusiveLock(ctx, gopts, opts.DryRun)
} else { } else {
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON) ctx, repo, unlock, err = openWithAppendLock(ctx, gopts, opts.DryRun)
} }
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return err
} }
} else { defer unlock()
repo.SetDryRun()
}
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -78,8 +78,11 @@ func testRewriteMetadata(t *testing.T, metadata snapshotMetadataArgs) {
createBasicRewriteRepo(t, env) createBasicRewriteRepo(t, env)
testRunRewriteExclude(t, env.gopts, []string{}, true, metadata) testRunRewriteExclude(t, env.gopts, []string{}, true, metadata)
repo, _ := OpenRepository(context.TODO(), env.gopts) ctx, repo, unlock, err := openWithReadLock(context.TODO(), env.gopts, false)
snapshots, err := restic.TestLoadAllSnapshots(context.TODO(), repo, nil) rtest.OK(t, err)
defer unlock()
snapshots, err := restic.TestLoadAllSnapshots(ctx, repo, nil)
rtest.OK(t, err) rtest.OK(t, err)
rtest.Assert(t, len(snapshots) == 1, "expected one snapshot, got %v", len(snapshots)) rtest.Assert(t, len(snapshots) == 1, "expected one snapshot, got %v", len(snapshots))
newSnapshot := snapshots[0] newSnapshot := snapshots[0]

View file

@ -59,19 +59,11 @@ func init() {
} }
func runSnapshots(ctx context.Context, opts SnapshotOptions, gopts GlobalOptions, args []string) error { func runSnapshots(ctx context.Context, opts SnapshotOptions, gopts GlobalOptions, args []string) error {
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
var snapshots restic.Snapshots var snapshots restic.Snapshots
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) { for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {

View file

@ -80,19 +80,11 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args
return err return err
} }
repo, err := OpenRepository(ctx, gopts) ctx, repo, unlock, err := openWithReadLock(ctx, gopts, gopts.NoLock)
if err != nil { if err != nil {
return err return err
} }
defer unlock()
if !gopts.NoLock {
var lock *restic.Lock
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil {
return err
}
}
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile) snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil { if err != nil {

View file

@ -104,20 +104,12 @@ func runTag(ctx context.Context, opts TagOptions, gopts GlobalOptions, args []st
return errors.Fatal("--set and --add/--remove cannot be given at the same time") return errors.Fatal("--set and --add/--remove cannot be given at the same time")
} }
repo, err := OpenRepository(ctx, gopts)
if err != nil {
return err
}
if !gopts.NoLock {
Verbosef("create exclusive lock for repository\n") Verbosef("create exclusive lock for repository\n")
var lock *restic.Lock ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
lock, ctx, err = lockRepoExclusive(ctx, repo, gopts.RetryLock, gopts.JSON)
defer unlockRepo(lock)
if err != nil { if err != nil {
return err return nil
}
} }
defer unlock()
changeCnt := 0 changeCnt := 0
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) { for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {

View file

@ -232,47 +232,66 @@ func testSetupBackupData(t testing.TB, env *testEnvironment) string {
} }
func listPacks(gopts GlobalOptions, t *testing.T) restic.IDSet { func listPacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
r, err := OpenRepository(context.TODO(), gopts) ctx, r, unlock, err := openWithReadLock(context.TODO(), gopts, false)
rtest.OK(t, err) rtest.OK(t, err)
defer unlock()
packs := restic.NewIDSet() packs := restic.NewIDSet()
rtest.OK(t, r.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
packs.Insert(id) packs.Insert(id)
return nil return nil
})) }))
return packs return packs
} }
func removePacks(gopts GlobalOptions, t testing.TB, remove restic.IDSet) { func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
r, err := OpenRepository(context.TODO(), gopts) ctx, r, unlock, err := openWithReadLock(context.TODO(), gopts, false)
rtest.OK(t, err) rtest.OK(t, err)
defer unlock()
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
return treePacks
}
func removePacks(gopts GlobalOptions, t testing.TB, remove restic.IDSet) {
ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false)
rtest.OK(t, err)
defer unlock()
for id := range remove { for id := range remove {
rtest.OK(t, r.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()})) rtest.OK(t, r.Backend().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}))
} }
} }
func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, removeTreePacks bool) { func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, removeTreePacks bool) {
r, err := OpenRepository(context.TODO(), gopts) ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false)
rtest.OK(t, err) rtest.OK(t, err)
defer unlock()
// Get all tree packs // Get all tree packs
rtest.OK(t, r.LoadIndex(context.TODO(), nil)) rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet() treePacks := restic.NewIDSet()
r.Index().Each(context.TODO(), func(pb restic.PackedBlob) { r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob { if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID) treePacks.Insert(pb.PackID)
} }
}) })
// remove all packs containing data blobs // remove all packs containing data blobs
rtest.OK(t, r.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
if treePacks.Has(id) != removeTreePacks || keep.Has(id) { if treePacks.Has(id) != removeTreePacks || keep.Has(id) {
return nil return nil
} }
return r.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()}) return r.Backend().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()})
})) }))
} }

View file

@ -154,12 +154,13 @@ func TestFindListOnce(t *testing.T) {
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "3")}, opts, env.gopts) testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "3")}, opts, env.gopts)
thirdSnapshot := restic.NewIDSet(testListSnapshots(t, env.gopts, 3)...) thirdSnapshot := restic.NewIDSet(testListSnapshots(t, env.gopts, 3)...)
repo, err := OpenRepository(context.TODO(), env.gopts) ctx, repo, unlock, err := openWithReadLock(context.TODO(), env.gopts, false)
rtest.OK(t, err) rtest.OK(t, err)
defer unlock()
snapshotIDs := restic.NewIDSet() snapshotIDs := restic.NewIDSet()
// specify the two oldest snapshots explicitly and use "latest" to reference the newest one // specify the two oldest snapshots explicitly and use "latest" to reference the newest one
for sn := range FindFilteredSnapshots(context.TODO(), repo, repo, &restic.SnapshotFilter{}, []string{ for sn := range FindFilteredSnapshots(ctx, repo, repo, &restic.SnapshotFilter{}, []string{
secondSnapshot[0].String(), secondSnapshot[0].String(),
secondSnapshot[1].String()[:8], secondSnapshot[1].String()[:8],
"latest", "latest",

View file

@ -2,316 +2,54 @@ package main
import ( import (
"context" "context"
"fmt"
"sync"
"time"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
) )
type lockContext struct { func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun bool, exclusive bool) (context.Context, *repository.Repository, func(), error) {
lock *restic.Lock repo, err := OpenRepository(ctx, gopts)
cancel context.CancelFunc if err != nil {
refreshWG sync.WaitGroup return nil, nil, nil, err
} }
var globalLocks struct { unlock := func() {}
locks map[*restic.Lock]*lockContext if !dryRun {
sync.Mutex var lock *repository.Unlocker
sync.Once
}
func lockRepo(ctx context.Context, repo restic.Repository, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) { lock, ctx, err = repository.Lock(ctx, repo, exclusive, gopts.RetryLock, func(msg string) {
return lockRepository(ctx, repo, false, retryLock, json) if !gopts.JSON {
Verbosef("%s", msg)
} }
}, Warnf)
func lockRepoExclusive(ctx context.Context, repo restic.Repository, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) { unlock = lock.Unlock
return lockRepository(ctx, repo, true, retryLock, json)
}
var (
retrySleepStart = 5 * time.Second
retrySleepMax = 60 * time.Second
)
func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}
// lockRepository wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func lockRepository(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) {
// make sure that a repository is unlocked properly and after cancel() was // make sure that a repository is unlocked properly and after cancel() was
// called by the cleanup handler in global.go // called by the cleanup handler in global.go
globalLocks.Do(func() { AddCleanupHandler(func(code int) (int, error) {
AddCleanupHandler(unlockAll) lock.Unlock()
return code, nil
}) })
lockFn := restic.NewLock if err != nil {
if exclusive { return nil, nil, nil, err
lockFn = restic.NewExclusiveLock
}
var lock *restic.Lock
var err error
retrySleep := minDuration(retrySleepStart, retryLock)
retryMessagePrinted := false
retryTimeout := time.After(retryLock)
retryLoop:
for {
lock, err = lockFn(ctx, repo)
if err != nil && restic.IsAlreadyLocked(err) {
if !retryMessagePrinted {
if !json {
Verbosef("repo already locked, waiting up to %s for the lock\n", retryLock)
}
retryMessagePrinted = true
}
debug.Log("repo already locked, retrying in %v", retrySleep)
retrySleepCh := time.After(retrySleep)
select {
case <-ctx.Done():
return nil, ctx, ctx.Err()
case <-retryTimeout:
debug.Log("repo already locked, timeout expired")
// Last lock attempt
lock, err = lockFn(ctx, repo)
break retryLoop
case <-retrySleepCh:
retrySleep = minDuration(retrySleep*2, retrySleepMax)
} }
} else { } else {
// anything else, either a successful lock or another error repo.SetDryRun()
break retryLoop
}
}
if restic.IsInvalidLock(err) {
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
}
if err != nil {
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
}
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
ctx, cancel := context.WithCancel(ctx)
lockInfo := &lockContext{
lock: lock,
cancel: cancel,
}
lockInfo.refreshWG.Add(2)
refreshChan := make(chan struct{})
forceRefreshChan := make(chan refreshLockRequest)
globalLocks.Lock()
globalLocks.locks[lock] = lockInfo
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan)
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan)
globalLocks.Unlock()
return lock, ctx, err
} }
var refreshInterval = 5 * time.Minute return ctx, repo, unlock, nil
// consider a lock refresh failed a bit before the lock actually becomes stale
// the difference allows to compensate for a small time drift between clients.
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
type refreshLockRequest struct {
result chan bool
} }
func refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest) { func openWithReadLock(ctx context.Context, gopts GlobalOptions, noLock bool) (context.Context, *repository.Repository, func(), error) {
debug.Log("start") // TODO enfore read-only operations once the locking code has moved to the repository
lock := lockInfo.lock return internalOpenWithLocked(ctx, gopts, noLock, false)
ticker := time.NewTicker(refreshInterval)
lastRefresh := lock.Time
defer func() {
ticker.Stop()
// ensure that the context was cancelled before removing the lock
lockInfo.cancel()
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(); err != nil {
debug.Log("error while unlocking: %v", err)
Warnf("error while unlocking: %v", err)
} }
lockInfo.refreshWG.Done() func openWithAppendLock(ctx context.Context, gopts GlobalOptions, dryRun bool) (context.Context, *repository.Repository, func(), error) {
}() // TODO enfore non-exclusive operations once the locking code has moved to the repository
return internalOpenWithLocked(ctx, gopts, dryRun, false)
for {
select {
case <-ctx.Done():
debug.Log("terminate")
return
case req := <-forceRefresh:
debug.Log("trying to refresh stale lock")
// keep on going if our current lock still exists
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel)
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case req.result <- success:
} }
if success { func openWithExclusiveLock(ctx context.Context, gopts GlobalOptions, dryRun bool) (context.Context, *repository.Repository, func(), error) {
// update lock refresh time return internalOpenWithLocked(ctx, gopts, dryRun, true)
lastRefresh = lock.Time
}
case <-ticker.C:
if time.Since(lastRefresh) > refreshabilityTimeout {
// the lock is too old, wait until the expiry monitor cancels the context
continue
}
debug.Log("refreshing locks")
err := lock.Refresh(context.TODO())
if err != nil {
Warnf("unable to refresh lock: %v\n", err)
} else {
lastRefresh = lock.Time
// inform monitor goroutine about successful refresh
select {
case <-ctx.Done():
case refreshed <- struct{}{}:
}
}
}
}
}
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest) {
// time.Now() might use a monotonic timer which is paused during standby
// convert to unix time to ensure we compare real time values
lastRefresh := time.Now().UnixNano()
pollDuration := 1 * time.Second
if refreshInterval < pollDuration {
// require for TestLockFailedRefresh
pollDuration = refreshInterval / 5
}
// timers are paused during standby, which is a problem as the refresh timeout
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
// https://github.com/golang/go/issues/35012
ticker := time.NewTicker(pollDuration)
defer func() {
ticker.Stop()
lockInfo.cancel()
lockInfo.refreshWG.Done()
}()
var refreshStaleLockResult chan bool
for {
select {
case <-ctx.Done():
debug.Log("terminate expiry monitoring")
return
case <-refreshed:
if refreshStaleLockResult != nil {
// ignore delayed refresh notifications while the stale lock is refreshed
continue
}
lastRefresh = time.Now().UnixNano()
case <-ticker.C:
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
continue
}
debug.Log("trying to refreshStaleLock")
// keep on going if our current lock still exists
refreshReq := refreshLockRequest{
result: make(chan bool),
}
refreshStaleLockResult = refreshReq.result
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case forceRefresh <- refreshReq:
}
case success := <-refreshStaleLockResult:
if success {
lastRefresh = time.Now().UnixNano()
refreshStaleLockResult = nil
continue
}
Warnf("Fatal: failed to refresh lock in time\n")
return
}
}
}
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc) bool {
freeze := backend.AsBackend[backend.FreezeBackend](be)
if freeze != nil {
debug.Log("freezing backend")
freeze.Freeze()
defer freeze.Unfreeze()
}
err := lock.RefreshStaleLock(ctx)
if err != nil {
Warnf("failed to refresh stale lock: %v\n", err)
// cancel context while the backend is still frozen to prevent accidental modifications
cancel()
return false
}
return true
}
func unlockRepo(lock *restic.Lock) {
if lock == nil {
return
}
globalLocks.Lock()
lockInfo, exists := globalLocks.locks[lock]
delete(globalLocks.locks, lock)
globalLocks.Unlock()
if !exists {
debug.Log("unable to find lock %v in the global list of locks, ignoring", lock)
return
}
lockInfo.cancel()
lockInfo.refreshWG.Wait()
}
func unlockAll(code int) (int, error) {
globalLocks.Lock()
locks := globalLocks.locks
debug.Log("unlocking %d locks", len(globalLocks.locks))
for _, lockInfo := range globalLocks.locks {
lockInfo.cancel()
}
globalLocks.locks = make(map[*restic.Lock]*lockContext)
globalLocks.Unlock()
for _, lockInfo := range locks {
lockInfo.refreshWG.Wait()
}
return code, nil
}
func init() {
globalLocks.locks = make(map[*restic.Lock]*lockContext)
} }

View file

@ -72,11 +72,9 @@ func assertOnlyMixedPackHints(t *testing.T, hints []error) {
} }
func TestCheckRepo(t *testing.T) { func TestCheckRepo(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
chkr := checker.New(repo, false) chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil) hints, errs := chkr.LoadIndex(context.TODO(), nil)
if len(errs) > 0 { if len(errs) > 0 {
@ -92,11 +90,9 @@ func TestCheckRepo(t *testing.T) {
} }
func TestMissingPack(t *testing.T) { func TestMissingPack(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
packHandle := backend.Handle{ packHandle := backend.Handle{
Type: restic.PackFile, Type: restic.PackFile,
Name: "657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6", Name: "657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6",
@ -123,11 +119,9 @@ func TestMissingPack(t *testing.T) {
} }
func TestUnreferencedPack(t *testing.T) { func TestUnreferencedPack(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
// index 3f1a only references pack 60e0 // index 3f1a only references pack 60e0
packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e" packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"
indexHandle := backend.Handle{ indexHandle := backend.Handle{
@ -156,11 +150,9 @@ func TestUnreferencedPack(t *testing.T) {
} }
func TestUnreferencedBlobs(t *testing.T) { func TestUnreferencedBlobs(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
snapshotHandle := backend.Handle{ snapshotHandle := backend.Handle{
Type: restic.SnapshotFile, Type: restic.SnapshotFile,
Name: "51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02", Name: "51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02",
@ -195,11 +187,9 @@ func TestUnreferencedBlobs(t *testing.T) {
} }
func TestModifiedIndex(t *testing.T) { func TestModifiedIndex(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
@ -274,11 +264,9 @@ func TestModifiedIndex(t *testing.T) {
var checkerDuplicateIndexTestData = filepath.Join("testdata", "duplicate-packs-in-index-test-repo.tar.gz") var checkerDuplicateIndexTestData = filepath.Join("testdata", "duplicate-packs-in-index-test-repo.tar.gz")
func TestDuplicatePacksInIndex(t *testing.T) { func TestDuplicatePacksInIndex(t *testing.T) {
repodir, cleanup := test.Env(t, checkerDuplicateIndexTestData) repo, cleanup := repository.TestFromFixture(t, checkerDuplicateIndexTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
chkr := checker.New(repo, false) chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil) hints, errs := chkr.LoadIndex(context.TODO(), nil)
if len(hints) == 0 { if len(hints) == 0 {
@ -342,9 +330,7 @@ func TestCheckerModifiedData(t *testing.T) {
t.Logf("archived as %v", sn.ID().Str()) t.Logf("archived as %v", sn.ID().Str())
beError := &errorBackend{Backend: repo.Backend()} beError := &errorBackend{Backend: repo.Backend()}
checkRepo, err := repository.New(beError, repository.Options{}) checkRepo := repository.TestOpenBackend(t, beError)
test.OK(t, err)
test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, ""))
chkr := checker.New(checkRepo, false) chkr := checker.New(checkRepo, false)
@ -399,10 +385,8 @@ func (r *loadTreesOnceRepository) LoadTree(ctx context.Context, id restic.ID) (*
} }
func TestCheckerNoDuplicateTreeDecodes(t *testing.T) { func TestCheckerNoDuplicateTreeDecodes(t *testing.T) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
checkRepo := &loadTreesOnceRepository{ checkRepo := &loadTreesOnceRepository{
Repository: repo, Repository: repo,
loadedTrees: restic.NewIDSet(), loadedTrees: restic.NewIDSet(),
@ -549,9 +533,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
} }
func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) { func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) {
repodir, cleanup := test.Env(t, checkerTestData) repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo := repository.TestOpenLocal(t, repodir)
chkr := checker.New(repo, false) chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil) hints, errs := chkr.LoadIndex(context.TODO(), nil)

View file

@ -15,11 +15,9 @@ import (
var repoFixture = filepath.Join("..", "repository", "testdata", "test-repo.tar.gz") var repoFixture = filepath.Join("..", "repository", "testdata", "test-repo.tar.gz")
func TestRepositoryForAllIndexes(t *testing.T) { func TestRepositoryForAllIndexes(t *testing.T) {
repodir, cleanup := rtest.Env(t, repoFixture) repo, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
expectedIndexIDs := restic.NewIDSet() expectedIndexIDs := restic.NewIDSet()
rtest.OK(t, repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { rtest.OK(t, repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
expectedIndexIDs.Insert(id) expectedIndexIDs.Insert(id)

View file

@ -43,11 +43,11 @@ type Key struct {
id restic.ID id restic.ID
} }
// Params tracks the parameters used for the KDF. If not set, it will be // params tracks the parameters used for the KDF. If not set, it will be
// calibrated on the first run of AddKey(). // calibrated on the first run of AddKey().
var Params *crypto.Params var params *crypto.Params
var ( const (
// KDFTimeout specifies the maximum runtime for the KDF. // KDFTimeout specifies the maximum runtime for the KDF.
KDFTimeout = 500 * time.Millisecond KDFTimeout = 500 * time.Millisecond
@ -196,13 +196,13 @@ func LoadKey(ctx context.Context, s *Repository, id restic.ID) (k *Key, err erro
// AddKey adds a new key to an already existing repository. // AddKey adds a new key to an already existing repository.
func AddKey(ctx context.Context, s *Repository, password, username, hostname string, template *crypto.Key) (*Key, error) { func AddKey(ctx context.Context, s *Repository, password, username, hostname string, template *crypto.Key) (*Key, error) {
// make sure we have valid KDF parameters // make sure we have valid KDF parameters
if Params == nil { if params == nil {
p, err := crypto.Calibrate(KDFTimeout, KDFMemory) p, err := crypto.Calibrate(KDFTimeout, KDFMemory)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Calibrate") return nil, errors.Wrap(err, "Calibrate")
} }
Params = &p params = &p
debug.Log("calibrated KDF parameters are %v", p) debug.Log("calibrated KDF parameters are %v", p)
} }
@ -213,9 +213,9 @@ func AddKey(ctx context.Context, s *Repository, password, username, hostname str
Hostname: hostname, Hostname: hostname,
KDF: "scrypt", KDF: "scrypt",
N: Params.N, N: params.N,
R: Params.R, R: params.R,
P: Params.P, P: params.P,
} }
if newkey.Hostname == "" { if newkey.Hostname == "" {
@ -237,7 +237,7 @@ func AddKey(ctx context.Context, s *Repository, password, username, hostname str
} }
// call KDF to derive user key // call KDF to derive user key
newkey.user, err = crypto.KDF(*Params, newkey.Salt, password) newkey.user, err = crypto.KDF(*params, newkey.Salt, password)
if err != nil { if err != nil {
return nil, err return nil, err
} }

274
internal/repository/lock.go Normal file
View file

@ -0,0 +1,274 @@
package repository
import (
"context"
"fmt"
"sync"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
type lockContext struct {
lock *restic.Lock
cancel context.CancelFunc
refreshWG sync.WaitGroup
}
type locker struct {
retrySleepStart time.Duration
retrySleepMax time.Duration
refreshInterval time.Duration
refreshabilityTimeout time.Duration
}
const defaultRefreshInterval = 5 * time.Minute
var lockerInst = &locker{
retrySleepStart: 5 * time.Second,
retrySleepMax: 60 * time.Second,
refreshInterval: defaultRefreshInterval,
// consider a lock refresh failed a bit before the lock actually becomes stale
// the difference allows to compensate for a small time drift between clients.
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
}
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
}
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func (l *locker) Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
lockFn := restic.NewLock
if exclusive {
lockFn = restic.NewExclusiveLock
}
var lock *restic.Lock
var err error
retrySleep := minDuration(l.retrySleepStart, retryLock)
retryMessagePrinted := false
retryTimeout := time.After(retryLock)
retryLoop:
for {
lock, err = lockFn(ctx, repo)
if err != nil && restic.IsAlreadyLocked(err) {
if !retryMessagePrinted {
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
retryMessagePrinted = true
}
debug.Log("repo already locked, retrying in %v", retrySleep)
retrySleepCh := time.After(retrySleep)
select {
case <-ctx.Done():
return nil, ctx, ctx.Err()
case <-retryTimeout:
debug.Log("repo already locked, timeout expired")
// Last lock attempt
lock, err = lockFn(ctx, repo)
break retryLoop
case <-retrySleepCh:
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
}
} else {
// anything else, either a successful lock or another error
break retryLoop
}
}
if restic.IsInvalidLock(err) {
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
}
if err != nil {
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
}
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
ctx, cancel := context.WithCancel(ctx)
lockInfo := &lockContext{
lock: lock,
cancel: cancel,
}
lockInfo.refreshWG.Add(2)
refreshChan := make(chan struct{})
forceRefreshChan := make(chan refreshLockRequest)
go l.refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
return &Unlocker{lockInfo}, ctx, nil
}
func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}
type refreshLockRequest struct {
result chan bool
}
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
debug.Log("start")
lock := lockInfo.lock
ticker := time.NewTicker(l.refreshInterval)
lastRefresh := lock.Time
defer func() {
ticker.Stop()
// ensure that the context was cancelled before removing the lock
lockInfo.cancel()
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(); err != nil {
debug.Log("error while unlocking: %v", err)
logger("error while unlocking: %v", err)
}
lockInfo.refreshWG.Done()
}()
for {
select {
case <-ctx.Done():
debug.Log("terminate")
return
case req := <-forceRefresh:
debug.Log("trying to refresh stale lock")
// keep on going if our current lock still exists
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case req.result <- success:
}
if success {
// update lock refresh time
lastRefresh = lock.Time
}
case <-ticker.C:
if time.Since(lastRefresh) > l.refreshabilityTimeout {
// the lock is too old, wait until the expiry monitor cancels the context
continue
}
debug.Log("refreshing locks")
err := lock.Refresh(context.TODO())
if err != nil {
logger("unable to refresh lock: %v\n", err)
} else {
lastRefresh = lock.Time
// inform monitor goroutine about successful refresh
select {
case <-ctx.Done():
case refreshed <- struct{}{}:
}
}
}
}
}
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
// time.Now() might use a monotonic timer which is paused during standby
// convert to unix time to ensure we compare real time values
lastRefresh := time.Now().UnixNano()
pollDuration := 1 * time.Second
if l.refreshInterval < pollDuration {
// required for TestLockFailedRefresh
pollDuration = l.refreshInterval / 5
}
// timers are paused during standby, which is a problem as the refresh timeout
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
// https://github.com/golang/go/issues/35012
ticker := time.NewTicker(pollDuration)
defer func() {
ticker.Stop()
lockInfo.cancel()
lockInfo.refreshWG.Done()
}()
var refreshStaleLockResult chan bool
for {
select {
case <-ctx.Done():
debug.Log("terminate expiry monitoring")
return
case <-refreshed:
if refreshStaleLockResult != nil {
// ignore delayed refresh notifications while the stale lock is refreshed
continue
}
lastRefresh = time.Now().UnixNano()
case <-ticker.C:
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
continue
}
debug.Log("trying to refreshStaleLock")
// keep on going if our current lock still exists
refreshReq := refreshLockRequest{
result: make(chan bool),
}
refreshStaleLockResult = refreshReq.result
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case forceRefresh <- refreshReq:
}
case success := <-refreshStaleLockResult:
if success {
lastRefresh = time.Now().UnixNano()
refreshStaleLockResult = nil
continue
}
logger("Fatal: failed to refresh lock in time\n")
return
}
}
}
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
freeze := backend.AsBackend[backend.FreezeBackend](be)
if freeze != nil {
debug.Log("freezing backend")
freeze.Freeze()
defer freeze.Unfreeze()
}
err := lock.RefreshStaleLock(ctx)
if err != nil {
logger("failed to refresh stale lock: %v\n", err)
// cancel context while the backend is still frozen to prevent accidental modifications
cancel()
return false
}
return true
}
type Unlocker struct {
info *lockContext
}
func (l *Unlocker) Unlock() {
l.info.cancel()
l.info.refreshWG.Wait()
}

View file

@ -1,4 +1,4 @@
package main package repository
import ( import (
"context" "context"
@ -10,94 +10,76 @@ import (
"time" "time"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/location"
"github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test" "github.com/restic/restic/internal/test"
rtest "github.com/restic/restic/internal/test"
) )
func openLockTestRepo(t *testing.T, wrapper backendWrapper) (*repository.Repository, func(), *testEnvironment) { type backendWrapper func(r backend.Backend) (backend.Backend, error)
env, cleanup := withTestEnvironment(t)
reg := location.NewRegistry() func openLockTestRepo(t *testing.T, wrapper backendWrapper) restic.Repository {
reg.Register(mem.NewFactory()) be := backend.Backend(mem.New())
env.gopts.backends = reg // initialize repo
env.gopts.Repo = "mem:" TestRepositoryWithBackend(t, be, 0, Options{})
// reopen repository to allow injecting a backend wrapper
if wrapper != nil { if wrapper != nil {
env.gopts.backendTestHook = wrapper var err error
} be, err = wrapper(be)
testRunInit(t, env.gopts) rtest.OK(t, err)
repo, err := OpenRepository(context.TODO(), env.gopts)
test.OK(t, err)
return repo, cleanup, env
} }
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, env *testEnvironment) (*restic.Lock, context.Context) { return TestOpenBackend(t, be)
lock, wrappedCtx, err := lockRepo(ctx, repo, env.gopts.RetryLock, env.gopts.JSON) }
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, lockerInst *locker, retryLock time.Duration) (*Unlocker, context.Context) {
lock, wrappedCtx, err := lockerInst.Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
test.OK(t, wrappedCtx.Err()) test.OK(t, wrappedCtx.Err())
if lock.Stale() { if lock.info.lock.Stale() {
t.Fatal("lock returned stale lock") t.Fatal("lock returned stale lock")
} }
return lock, wrappedCtx return lock, wrappedCtx
} }
func TestLock(t *testing.T) { func TestLock(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env) lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, lockerInst, 0)
unlockRepo(lock) lock.Unlock()
if wrappedCtx.Err() == nil { if wrappedCtx.Err() == nil {
t.Fatal("unlock did not cancel context") t.Fatal("unlock did not cancel context")
} }
} }
func TestLockCancel(t *testing.T) { func TestLockCancel(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
lock, wrappedCtx := checkedLockRepo(ctx, t, repo, env) lock, wrappedCtx := checkedLockRepo(ctx, t, repo, lockerInst, 0)
cancel() cancel()
if wrappedCtx.Err() == nil { if wrappedCtx.Err() == nil {
t.Fatal("canceled parent context did not cancel context") t.Fatal("canceled parent context did not cancel context")
} }
// unlockRepo should not crash // Unlock should not crash
unlockRepo(lock) lock.Unlock()
}
func TestLockUnlockAll(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil)
defer cleanup()
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
_, err := unlockAll(0)
test.OK(t, err)
if wrappedCtx.Err() == nil {
t.Fatal("canceled parent context did not cancel context")
}
// unlockRepo should not crash
unlockRepo(lock)
} }
func TestLockConflict(t *testing.T) { func TestLockConflict(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
repo2, err := OpenRepository(context.TODO(), env.gopts) repo2 := TestOpenBackend(t, repo.Backend())
test.OK(t, err)
lock, _, err := lockRepoExclusive(context.Background(), repo, env.gopts.RetryLock, env.gopts.JSON) lock, _, err := Lock(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
defer unlockRepo(lock) defer lock.Unlock()
_, _, err = lockRepo(context.Background(), repo2, env.gopts.RetryLock, env.gopts.JSON) _, _, err = Lock(context.Background(), repo2, false, 0, func(msg string) {}, func(format string, args ...interface{}) {})
if err == nil { if err == nil {
t.Fatal("second lock should have failed") t.Fatal("second lock should have failed")
} }
@ -118,20 +100,19 @@ func (b *writeOnceBackend) Save(ctx context.Context, h backend.Handle, rd backen
} }
func TestLockFailedRefresh(t *testing.T) { func TestLockFailedRefresh(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) { t.Parallel()
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
return &writeOnceBackend{Backend: r}, nil return &writeOnceBackend{Backend: r}, nil
}) })
defer cleanup()
// reduce locking intervals to be suitable for testing // reduce locking intervals to be suitable for testing
ri, rt := refreshInterval, refreshabilityTimeout li := &locker{
refreshInterval = 20 * time.Millisecond retrySleepStart: lockerInst.retrySleepStart,
refreshabilityTimeout = 100 * time.Millisecond retrySleepMax: lockerInst.retrySleepMax,
defer func() { refreshInterval: 20 * time.Millisecond,
refreshInterval, refreshabilityTimeout = ri, rt refreshabilityTimeout: 100 * time.Millisecond,
}() }
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
select { select {
case <-wrappedCtx.Done(): case <-wrappedCtx.Done():
@ -139,8 +120,8 @@ func TestLockFailedRefresh(t *testing.T) {
case <-time.After(time.Second): case <-time.After(time.Second):
t.Fatal("failed lock refresh did not cause context cancellation") t.Fatal("failed lock refresh did not cause context cancellation")
} }
// unlockRepo should not crash // Unlock should not crash
unlockRepo(lock) lock.Unlock()
} }
type loggingBackend struct { type loggingBackend struct {
@ -156,24 +137,23 @@ func (b *loggingBackend) Save(ctx context.Context, h backend.Handle, rd backend.
} }
func TestLockSuccessfulRefresh(t *testing.T) { func TestLockSuccessfulRefresh(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) { t.Parallel()
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
return &loggingBackend{ return &loggingBackend{
Backend: r, Backend: r,
t: t, t: t,
}, nil }, nil
}) })
defer cleanup()
t.Logf("test for successful lock refresh %v", time.Now()) t.Logf("test for successful lock refresh %v", time.Now())
// reduce locking intervals to be suitable for testing // reduce locking intervals to be suitable for testing
ri, rt := refreshInterval, refreshabilityTimeout li := &locker{
refreshInterval = 60 * time.Millisecond retrySleepStart: lockerInst.retrySleepStart,
refreshabilityTimeout = 500 * time.Millisecond retrySleepMax: lockerInst.retrySleepMax,
defer func() { refreshInterval: 60 * time.Millisecond,
refreshInterval, refreshabilityTimeout = ri, rt refreshabilityTimeout: 500 * time.Millisecond,
}() }
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
select { select {
case <-wrappedCtx.Done(): case <-wrappedCtx.Done():
@ -186,11 +166,11 @@ func TestLockSuccessfulRefresh(t *testing.T) {
buf = buf[:n] buf = buf[:n]
t.Log(string(buf)) t.Log(string(buf))
case <-time.After(2 * refreshabilityTimeout): case <-time.After(2 * li.refreshabilityTimeout):
// expected lock refresh to work // expected lock refresh to work
} }
// unlockRepo should not crash // Unlock should not crash
unlockRepo(lock) lock.Unlock()
} }
type slowBackend struct { type slowBackend struct {
@ -208,26 +188,26 @@ func (b *slowBackend) Save(ctx context.Context, h backend.Handle, rd backend.Rew
} }
func TestLockSuccessfulStaleRefresh(t *testing.T) { func TestLockSuccessfulStaleRefresh(t *testing.T) {
t.Parallel()
var sb *slowBackend var sb *slowBackend
repo, cleanup, env := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) { repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
sb = &slowBackend{Backend: r} sb = &slowBackend{Backend: r}
return sb, nil return sb, nil
}) })
defer cleanup()
t.Logf("test for successful lock refresh %v", time.Now()) t.Logf("test for successful lock refresh %v", time.Now())
// reduce locking intervals to be suitable for testing // reduce locking intervals to be suitable for testing
ri, rt := refreshInterval, refreshabilityTimeout li := &locker{
refreshInterval = 10 * time.Millisecond retrySleepStart: lockerInst.retrySleepStart,
refreshabilityTimeout = 50 * time.Millisecond retrySleepMax: lockerInst.retrySleepMax,
defer func() { refreshInterval: 10 * time.Millisecond,
refreshInterval, refreshabilityTimeout = ri, rt refreshabilityTimeout: 50 * time.Millisecond,
}() }
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env) lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, li, 0)
// delay lock refreshing long enough that the lock would expire // delay lock refreshing long enough that the lock would expire
sb.m.Lock() sb.m.Lock()
sb.sleep = refreshabilityTimeout + refreshInterval sb.sleep = li.refreshabilityTimeout + li.refreshInterval
sb.m.Unlock() sb.m.Unlock()
select { select {
@ -235,7 +215,7 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
// don't call t.Fatal to allow the lock to be properly cleaned up // don't call t.Fatal to allow the lock to be properly cleaned up
t.Error("lock refresh failed", time.Now()) t.Error("lock refresh failed", time.Now())
case <-time.After(refreshabilityTimeout): case <-time.After(li.refreshabilityTimeout):
} }
// reset slow backend // reset slow backend
sb.m.Lock() sb.m.Lock()
@ -248,25 +228,26 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
// don't call t.Fatal to allow the lock to be properly cleaned up // don't call t.Fatal to allow the lock to be properly cleaned up
t.Error("lock refresh failed", time.Now()) t.Error("lock refresh failed", time.Now())
case <-time.After(3 * refreshabilityTimeout): case <-time.After(3 * li.refreshabilityTimeout):
// expected lock refresh to work // expected lock refresh to work
} }
// unlockRepo should not crash // Unlock should not crash
unlockRepo(lock) lock.Unlock()
} }
func TestLockWaitTimeout(t *testing.T) { func TestLockWaitTimeout(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
defer elock.Unlock()
retryLock := 200 * time.Millisecond retryLock := 200 * time.Millisecond
start := time.Now() start := time.Now()
lock, _, err := lockRepo(context.TODO(), repo, retryLock, env.gopts.JSON) _, _, err = Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
duration := time.Since(start) duration := time.Since(start)
test.Assert(t, err != nil, test.Assert(t, err != nil,
@ -275,17 +256,15 @@ func TestLockWaitTimeout(t *testing.T) {
"create normal lock with exclusively locked repo didn't return the correct error") "create normal lock with exclusively locked repo didn't return the correct error")
test.Assert(t, retryLock <= duration && duration < retryLock*3/2, test.Assert(t, retryLock <= duration && duration < retryLock*3/2,
"create normal lock with exclusively locked repo didn't wait for the specified timeout") "create normal lock with exclusively locked repo didn't wait for the specified timeout")
test.OK(t, lock.Unlock())
test.OK(t, elock.Unlock())
} }
func TestLockWaitCancel(t *testing.T) { func TestLockWaitCancel(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
defer elock.Unlock()
retryLock := 200 * time.Millisecond retryLock := 200 * time.Millisecond
cancelAfter := 40 * time.Millisecond cancelAfter := 40 * time.Millisecond
@ -294,7 +273,7 @@ func TestLockWaitCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
time.AfterFunc(cancelAfter, cancel) time.AfterFunc(cancelAfter, cancel)
lock, _, err := lockRepo(ctx, repo, retryLock, env.gopts.JSON) _, _, err = Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
duration := time.Since(start) duration := time.Since(start)
test.Assert(t, err != nil, test.Assert(t, err != nil,
@ -303,27 +282,23 @@ func TestLockWaitCancel(t *testing.T) {
"create normal lock with exclusively locked repo didn't return the correct error") "create normal lock with exclusively locked repo didn't return the correct error")
test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond, test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond,
"create normal lock with exclusively locked repo didn't return in time, duration %v", duration) "create normal lock with exclusively locked repo didn't return in time, duration %v", duration)
test.OK(t, lock.Unlock())
test.OK(t, elock.Unlock())
} }
func TestLockWaitSuccess(t *testing.T) { func TestLockWaitSuccess(t *testing.T) {
repo, cleanup, env := openLockTestRepo(t, nil) t.Parallel()
defer cleanup() repo := openLockTestRepo(t, nil)
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON) elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
retryLock := 200 * time.Millisecond retryLock := 200 * time.Millisecond
unlockAfter := 40 * time.Millisecond unlockAfter := 40 * time.Millisecond
time.AfterFunc(unlockAfter, func() { time.AfterFunc(unlockAfter, func() {
test.OK(t, elock.Unlock()) elock.Unlock()
}) })
lock, _, err := lockRepo(context.TODO(), repo, retryLock, env.gopts.JSON) lock, _, err := Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err) test.OK(t, err)
lock.Unlock()
test.OK(t, lock.Unlock())
} }

View file

@ -221,10 +221,9 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) {
var repoFixture = filepath.Join("testdata", "test-repo.tar.gz") var repoFixture = filepath.Join("testdata", "test-repo.tar.gz")
func TestRepositoryLoadIndex(t *testing.T) { func TestRepositoryLoadIndex(t *testing.T) {
repodir, cleanup := rtest.Env(t, repoFixture) repo, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup() defer cleanup()
repo := repository.TestOpenLocal(t, repodir)
rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
} }
@ -243,7 +242,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*
} }
func TestRepositoryLoadUnpackedBroken(t *testing.T) { func TestRepositoryLoadUnpackedBroken(t *testing.T) {
repodir, cleanup := rtest.Env(t, repoFixture) repo, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup() defer cleanup()
data := rtest.Random(23, 12345) data := rtest.Random(23, 12345)
@ -252,7 +251,6 @@ func TestRepositoryLoadUnpackedBroken(t *testing.T) {
// damage buffer // damage buffer
data[0] ^= 0xff data[0] ^= 0xff
repo := repository.TestOpenLocal(t, repodir)
// store broken file // store broken file
err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, nil)) err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, nil))
rtest.OK(t, err) rtest.OK(t, err)
@ -289,10 +287,7 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2}) be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2})
rtest.OK(t, err) rtest.OK(t, err)
repo, err := repository.New(&damageOnceBackend{Backend: be}, repository.Options{}) repo := repository.TestOpenBackend(t, &damageOnceBackend{Backend: be})
rtest.OK(t, err)
err = repo.SearchKey(context.TODO(), rtest.TestPassword, 10, "")
rtest.OK(t, err)
rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sync"
"testing" "testing"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
@ -17,21 +18,22 @@ import (
"github.com/restic/chunker" "github.com/restic/chunker"
) )
// testKDFParams are the parameters for the KDF to be used during testing.
var testKDFParams = crypto.Params{
N: 128,
R: 1,
P: 1,
}
type logger interface { type logger interface {
Logf(format string, args ...interface{}) Logf(format string, args ...interface{})
} }
var paramsOnce sync.Once
// TestUseLowSecurityKDFParameters configures low-security KDF parameters for testing. // TestUseLowSecurityKDFParameters configures low-security KDF parameters for testing.
func TestUseLowSecurityKDFParameters(t logger) { func TestUseLowSecurityKDFParameters(t logger) {
t.Logf("using low-security KDF parameters for test") t.Logf("using low-security KDF parameters for test")
Params = &testKDFParams paramsOnce.Do(func() {
params = &crypto.Params{
N: 128,
R: 1,
P: 1,
}
})
} }
// TestBackend returns a fully configured in-memory backend. // TestBackend returns a fully configured in-memory backend.
@ -39,7 +41,7 @@ func TestBackend(_ testing.TB) backend.Backend {
return mem.New() return mem.New()
} }
const TestChunkerPol = chunker.Pol(0x3DA3358B4DC173) const testChunkerPol = chunker.Pol(0x3DA3358B4DC173)
// TestRepositoryWithBackend returns a repository initialized with a test // TestRepositoryWithBackend returns a repository initialized with a test
// password. If be is nil, an in-memory backend is used. A constant polynomial // password. If be is nil, an in-memory backend is used. A constant polynomial
@ -58,7 +60,7 @@ func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, o
t.Fatalf("TestRepository(): new repo failed: %v", err) t.Fatalf("TestRepository(): new repo failed: %v", err)
} }
cfg := restic.TestCreateConfig(t, TestChunkerPol, version) cfg := restic.TestCreateConfig(t, testChunkerPol, version)
err = repo.init(context.TODO(), test.TestPassword, cfg) err = repo.init(context.TODO(), test.TestPassword, cfg)
if err != nil { if err != nil {
t.Fatalf("TestRepository(): initialize repo failed: %v", err) t.Fatalf("TestRepository(): initialize repo failed: %v", err)
@ -98,8 +100,15 @@ func TestRepositoryWithVersion(t testing.TB, version uint) restic.Repository {
return TestRepositoryWithBackend(t, nil, version, opts) return TestRepositoryWithBackend(t, nil, version, opts)
} }
func TestFromFixture(t testing.TB, repoFixture string) (restic.Repository, func()) {
repodir, cleanup := test.Env(t, repoFixture)
repo := TestOpenLocal(t, repodir)
return repo, cleanup
}
// TestOpenLocal opens a local repository. // TestOpenLocal opens a local repository.
func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) { func TestOpenLocal(t testing.TB, dir string) restic.Repository {
var be backend.Backend var be backend.Backend
be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2}) be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2})
if err != nil { if err != nil {
@ -108,6 +117,10 @@ func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
be = retry.New(be, 3, nil, nil) be = retry.New(be, 3, nil, nil)
return TestOpenBackend(t, be)
}
func TestOpenBackend(t testing.TB, be backend.Backend) restic.Repository {
repo, err := New(be, Options{}) repo, err := New(be, Options{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -2,6 +2,7 @@ package restic
import ( import (
"context" "context"
"sync"
"testing" "testing"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
@ -67,12 +68,15 @@ func TestCreateConfig(t testing.TB, pol chunker.Pol, version uint) (cfg Config)
} }
var checkPolynomial = true var checkPolynomial = true
var checkPolynomialOnce sync.Once
// TestDisableCheckPolynomial disables the check that the polynomial used for // TestDisableCheckPolynomial disables the check that the polynomial used for
// the chunker. // the chunker.
func TestDisableCheckPolynomial(t testing.TB) { func TestDisableCheckPolynomial(t testing.TB) {
t.Logf("disabling check of the chunker polynomial") t.Logf("disabling check of the chunker polynomial")
checkPolynomialOnce.Do(func() {
checkPolynomial = false checkPolynomial = false
})
} }
// LoadConfig returns loads, checks and returns the config for a repository. // LoadConfig returns loads, checks and returns the config for a repository.