Regularly save intermediate indexes

This commit is contained in:
Alexander Neumann 2015-10-12 23:59:17 +02:00
parent 941b7025b6
commit 6fa4be5af2
4 changed files with 116 additions and 7 deletions

View file

@ -9,6 +9,7 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"sync" "sync"
"time"
"github.com/restic/chunker" "github.com/restic/chunker"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
@ -540,6 +541,30 @@ func (j archiveJob) Copy() pipe.Job {
return j.new return j.new
} }
const saveIndexTime = 30 * time.Second
// saveIndexes regularly queries the master index for full indexes and saves them.
func (arch *Archiver) saveIndexes(wg *sync.WaitGroup, done <-chan struct{}) {
defer wg.Done()
ticker := time.NewTicker(saveIndexTime)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
debug.Log("Archiver.saveIndexes", "saving full indexes")
err := arch.repo.SaveFullIndex()
if err != nil {
debug.Log("Archiver.saveIndexes", "save indexes returned an error: %v", err)
fmt.Fprintf(os.Stderr, "error saving preliminary index: %v\n", err)
}
}
}
}
// Snapshot creates a snapshot of the given paths. If parentID is set, this is // Snapshot creates a snapshot of the given paths. If parentID is set, this is
// used to compare the files to the ones archived at the time this snapshot was // used to compare the files to the ones archived at the time this snapshot was
// taken. // taken.
@ -623,10 +648,20 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID
go arch.dirWorker(&wg, p, done, dirCh) go arch.dirWorker(&wg, p, done, dirCh)
} }
// run index saver
var wgIndexSaver sync.WaitGroup
stopIndexSaver := make(chan struct{})
wgIndexSaver.Add(1)
go arch.saveIndexes(&wgIndexSaver, stopIndexSaver)
// wait for all workers to terminate // wait for all workers to terminate
debug.Log("Archiver.Snapshot", "wait for workers") debug.Log("Archiver.Snapshot", "wait for workers")
wg.Wait() wg.Wait()
// stop index saver
close(stopIndexSaver)
wgIndexSaver.Wait()
debug.Log("Archiver.Snapshot", "workers terminated") debug.Log("Archiver.Snapshot", "workers terminated")
// receive the top-level tree // receive the top-level tree
@ -681,7 +716,6 @@ func Scan(dirs []string, filter pipe.SelectFunc, p *Progress) (Stat, error) {
for _, dir := range dirs { for _, dir := range dirs {
debug.Log("Scan", "Start for %v", dir) debug.Log("Scan", "Start for %v", dir)
err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error { err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error {
debug.Log("Scan.Walk", "%v, fi: %v, err: %v", str, fi, err)
// TODO: integrate error reporting // TODO: integrate error reporting
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error for %v: %v\n", str, err) fmt.Fprintf(os.Stderr, "error for %v: %v\n", str, err)

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"sync" "sync"
"time"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/crypto" "github.com/restic/restic/crypto"
@ -20,6 +21,7 @@ type Index struct {
final bool // set to true for all indexes read from the backend ("finalized") final bool // set to true for all indexes read from the backend ("finalized")
supersedes backend.IDs supersedes backend.IDs
created time.Time
} }
type indexEntry struct { type indexEntry struct {
@ -32,7 +34,8 @@ type indexEntry struct {
// NewIndex returns a new index. // NewIndex returns a new index.
func NewIndex() *Index { func NewIndex() *Index {
return &Index{ return &Index{
pack: make(map[backend.ID]indexEntry), pack: make(map[backend.ID]indexEntry),
created: time.Now(),
} }
} }
@ -54,6 +57,42 @@ func (idx *Index) Final() bool {
return idx.final return idx.final
} }
const (
indexMinBlobs = 20
indexMaxBlobs = 2000
indexMinAge = 2 * time.Minute
indexMaxAge = 15 * time.Minute
)
// Full returns true iff the index is "full enough" to be saved as a preliminary index.
func (idx *Index) Full() bool {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Full", "checking whether index %p is full", idx)
packs := len(idx.pack)
age := time.Now().Sub(idx.created)
if age > indexMaxAge {
debug.Log("Index.Full", "index %p is old enough", idx, age)
return true
}
if packs < indexMinBlobs || age < indexMinAge {
debug.Log("Index.Full", "index %p only has %d packs or is too young (%v)", idx, packs, age)
return false
}
if packs > indexMaxBlobs {
debug.Log("Index.Full", "index %p has %d packs", idx, packs)
return true
}
debug.Log("Index.Full", "index %p is not full", idx)
return false
}
// Store remembers the id and pack in the index. An existing entry will be // Store remembers the id and pack in the index. An existing entry will be
// silently overwritten. // silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {

View file

@ -187,7 +187,33 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index {
} }
} }
debug.Log("MasterIndex.NotFinalIndexes", "saving %d indexes", len(list)) debug.Log("MasterIndex.NotFinalIndexes", "return %d indexes", len(list))
return list
}
// FullIndexes returns all indexes that are full.
func (mi *MasterIndex) FullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("MasterIndex.FullIndexes", "checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
debug.Log("MasterIndex.FullIndexes", "index %p is final", idx)
continue
}
if idx.Full() {
debug.Log("MasterIndex.FullIndexes", "index %p is full", idx)
list = append(list, idx)
} else {
debug.Log("MasterIndex.FullIndexes", "index %p not full", idx)
}
}
debug.Log("MasterIndex.FullIndexes", "return %d indexes", len(list))
return list return list
} }

View file

@ -235,7 +235,7 @@ func (r *Repository) findPacker(size uint) (*pack.Packer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
debug.Log("Repo.findPacker", "create new pack %p", blob) debug.Log("Repo.findPacker", "create new pack %p for %d bytes", blob, size)
return pack.NewPacker(r.key, blob), nil return pack.NewPacker(r.key, blob), nil
} }
@ -513,9 +513,9 @@ func (bw *BlobWriter) ID() backend.ID {
return bw.id return bw.id
} }
// SaveIndex saves all new indexes in the backend. // saveIndex saves all indexes in the backend.
func (r *Repository) SaveIndex() error { func (r *Repository) saveIndex(indexes ...*Index) error {
for i, idx := range r.idx.NotFinalIndexes() { for i, idx := range indexes {
debug.Log("Repo.SaveIndex", "Saving index %d", i) debug.Log("Repo.SaveIndex", "Saving index %d", i)
blob, err := r.CreateEncryptedBlob(backend.Index) blob, err := r.CreateEncryptedBlob(backend.Index)
@ -541,6 +541,16 @@ func (r *Repository) SaveIndex() error {
return nil return nil
} }
// SaveIndex saves all new indexes in the backend.
func (r *Repository) SaveIndex() error {
return r.saveIndex(r.idx.NotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (r *Repository) SaveFullIndex() error {
return r.saveIndex(r.idx.FullIndexes()...)
}
const loadIndexParallelism = 20 const loadIndexParallelism = 20
// LoadIndex loads all index files from the backend in parallel and stores them // LoadIndex loads all index files from the backend in parallel and stores them