forked from TrueCloudLab/restic
Merge pull request #3731 from metalsp0rk/feature/min-packsize-flag
Feature: min packsize flag
This commit is contained in:
commit
2930a102de
17 changed files with 191 additions and 34 deletions
12
changelog/unreleased/issue-2291
Normal file
12
changelog/unreleased/issue-2291
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
Enhancement: Allow pack size customization
|
||||||
|
|
||||||
|
Restic now uses a target pack size of 16 MiB by default. It can be customized
|
||||||
|
using the `--pack-size size` option. Supported pack sizes range between 4 and
|
||||||
|
128 MiB.
|
||||||
|
|
||||||
|
It is possible to migrate an existing repository to _larger_ pack files using
|
||||||
|
`prune --repack-small`. This will rewrite every pack file which is
|
||||||
|
significantly smaller than the target size.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/issues/2291
|
||||||
|
https://github.com/restic/restic/pull/3731
|
|
@ -86,7 +86,13 @@ func runInit(opts InitOptions, gopts GlobalOptions, args []string) error {
|
||||||
return errors.Fatalf("create repository at %s failed: %v\n", location.StripPassword(gopts.Repo), err)
|
return errors.Fatalf("create repository at %s failed: %v\n", location.StripPassword(gopts.Repo), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := repository.New(be, repository.Options{Compression: gopts.Compression})
|
s, err := repository.New(be, repository.Options{
|
||||||
|
Compression: gopts.Compression,
|
||||||
|
PackSize: gopts.PackSize * 1024 * 1024,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = s.Init(gopts.ctx, version, gopts.password, chunkerPolynomial)
|
err = s.Init(gopts.ctx, version, gopts.password, chunkerPolynomial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -52,6 +52,7 @@ type PruneOptions struct {
|
||||||
MaxRepackBytes uint64
|
MaxRepackBytes uint64
|
||||||
|
|
||||||
RepackCachableOnly bool
|
RepackCachableOnly bool
|
||||||
|
RepackSmall bool
|
||||||
RepackUncompressed bool
|
RepackUncompressed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,6 +71,7 @@ func addPruneOptions(c *cobra.Command) {
|
||||||
f.StringVar(&pruneOptions.MaxUnused, "max-unused", "5%", "tolerate given `limit` of unused data (absolute value in bytes with suffixes k/K, m/M, g/G, t/T, a value in % or the word 'unlimited')")
|
f.StringVar(&pruneOptions.MaxUnused, "max-unused", "5%", "tolerate given `limit` of unused data (absolute value in bytes with suffixes k/K, m/M, g/G, t/T, a value in % or the word 'unlimited')")
|
||||||
f.StringVar(&pruneOptions.MaxRepackSize, "max-repack-size", "", "maximum `size` to repack (allowed suffixes: k/K, m/M, g/G, t/T)")
|
f.StringVar(&pruneOptions.MaxRepackSize, "max-repack-size", "", "maximum `size` to repack (allowed suffixes: k/K, m/M, g/G, t/T)")
|
||||||
f.BoolVar(&pruneOptions.RepackCachableOnly, "repack-cacheable-only", false, "only repack packs which are cacheable")
|
f.BoolVar(&pruneOptions.RepackCachableOnly, "repack-cacheable-only", false, "only repack packs which are cacheable")
|
||||||
|
f.BoolVar(&pruneOptions.RepackSmall, "repack-small", false, "repack pack files below 80%% of target pack size")
|
||||||
f.BoolVar(&pruneOptions.RepackUncompressed, "repack-uncompressed", false, "repack all uncompressed data")
|
f.BoolVar(&pruneOptions.RepackUncompressed, "repack-uncompressed", false, "repack all uncompressed data")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,7 +424,14 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
repackPacks := restic.NewIDSet()
|
repackPacks := restic.NewIDSet()
|
||||||
|
|
||||||
var repackCandidates []packInfoWithID
|
var repackCandidates []packInfoWithID
|
||||||
|
var repackSmallCandidates []packInfoWithID
|
||||||
repoVersion := repo.Config().Version
|
repoVersion := repo.Config().Version
|
||||||
|
// only repack very small files by default
|
||||||
|
targetPackSize := repo.PackSize() / 25
|
||||||
|
if opts.RepackSmall {
|
||||||
|
// consider files with at least 80% of the target size as large enough
|
||||||
|
targetPackSize = repo.PackSize() / 5 * 4
|
||||||
|
}
|
||||||
|
|
||||||
// loop over all packs and decide what to do
|
// loop over all packs and decide what to do
|
||||||
bar := newProgressMax(!gopts.Quiet, uint64(len(indexPack)), "packs processed")
|
bar := newProgressMax(!gopts.Quiet, uint64(len(indexPack)), "packs processed")
|
||||||
|
@ -477,8 +486,12 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
stats.packs.keep++
|
stats.packs.keep++
|
||||||
|
|
||||||
case p.unusedBlobs == 0 && p.tpe != restic.InvalidBlob && !mustCompress:
|
case p.unusedBlobs == 0 && p.tpe != restic.InvalidBlob && !mustCompress:
|
||||||
// All blobs in pack are used and not mixed => keep pack!
|
if packSize >= int64(targetPackSize) {
|
||||||
stats.packs.keep++
|
// All blobs in pack are used and not mixed => keep pack!
|
||||||
|
stats.packs.keep++
|
||||||
|
} else {
|
||||||
|
repackSmallCandidates = append(repackSmallCandidates, packInfoWithID{ID: id, packInfo: p})
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// all other packs are candidates for repacking
|
// all other packs are candidates for repacking
|
||||||
|
@ -521,11 +534,19 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(repackSmallCandidates) < 10 {
|
||||||
|
// too few small files to be worth the trouble, this also prevents endlessly repacking
|
||||||
|
// if there is just a single pack file below the target size
|
||||||
|
stats.packs.keep += uint(len(repackSmallCandidates))
|
||||||
|
} else {
|
||||||
|
repackCandidates = append(repackCandidates, repackSmallCandidates...)
|
||||||
|
}
|
||||||
|
|
||||||
// Sort repackCandidates such that packs with highest ratio unused/used space are picked first.
|
// Sort repackCandidates such that packs with highest ratio unused/used space are picked first.
|
||||||
// This is equivalent to sorting by unused / total space.
|
// This is equivalent to sorting by unused / total space.
|
||||||
// Instead of unused[i] / used[i] > unused[j] / used[j] we use
|
// Instead of unused[i] / used[i] > unused[j] / used[j] we use
|
||||||
// unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64
|
// unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64
|
||||||
// Morover packs containing trees are sorted to the beginning
|
// Moreover packs containing trees and too small packs are sorted to the beginning
|
||||||
sort.Slice(repackCandidates, func(i, j int) bool {
|
sort.Slice(repackCandidates, func(i, j int) bool {
|
||||||
pi := repackCandidates[i].packInfo
|
pi := repackCandidates[i].packInfo
|
||||||
pj := repackCandidates[j].packInfo
|
pj := repackCandidates[j].packInfo
|
||||||
|
@ -534,6 +555,10 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
return true
|
return true
|
||||||
case pj.tpe != restic.DataBlob && pi.tpe == restic.DataBlob:
|
case pj.tpe != restic.DataBlob && pi.tpe == restic.DataBlob:
|
||||||
return false
|
return false
|
||||||
|
case pi.unusedSize+pi.usedSize < uint64(targetPackSize) && pj.unusedSize+pj.usedSize >= uint64(targetPackSize):
|
||||||
|
return true
|
||||||
|
case pj.unusedSize+pj.usedSize < uint64(targetPackSize) && pi.unusedSize+pi.usedSize >= uint64(targetPackSize):
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
return pi.unusedSize*pj.usedSize > pj.unusedSize*pi.usedSize
|
return pi.unusedSize*pj.usedSize > pj.unusedSize*pi.usedSize
|
||||||
})
|
})
|
||||||
|
@ -552,6 +577,7 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
for _, p := range repackCandidates {
|
for _, p := range repackCandidates {
|
||||||
reachedUnusedSizeAfter := (stats.size.unused-stats.size.remove-stats.size.repackrm < maxUnusedSizeAfter)
|
reachedUnusedSizeAfter := (stats.size.unused-stats.size.remove-stats.size.repackrm < maxUnusedSizeAfter)
|
||||||
reachedRepackSize := stats.size.repack+p.unusedSize+p.usedSize >= opts.MaxRepackBytes
|
reachedRepackSize := stats.size.repack+p.unusedSize+p.usedSize >= opts.MaxRepackBytes
|
||||||
|
packIsLargeEnough := p.unusedSize+p.usedSize >= uint64(targetPackSize)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case reachedRepackSize:
|
case reachedRepackSize:
|
||||||
|
@ -561,7 +587,7 @@ func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||||
// repacking non-data packs / uncompressed-trees is only limited by repackSize
|
// repacking non-data packs / uncompressed-trees is only limited by repackSize
|
||||||
repack(p.ID, p.packInfo)
|
repack(p.ID, p.packInfo)
|
||||||
|
|
||||||
case reachedUnusedSizeAfter:
|
case reachedUnusedSizeAfter && packIsLargeEnough:
|
||||||
// for all other packs stop repacking if tolerated unused size is reached.
|
// for all other packs stop repacking if tolerated unused size is reached.
|
||||||
stats.packs.keep++
|
stats.packs.keep++
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -62,6 +63,7 @@ type GlobalOptions struct {
|
||||||
NoCache bool
|
NoCache bool
|
||||||
CleanupCache bool
|
CleanupCache bool
|
||||||
Compression repository.CompressionMode
|
Compression repository.CompressionMode
|
||||||
|
PackSize uint
|
||||||
|
|
||||||
backend.TransportOptions
|
backend.TransportOptions
|
||||||
limiter.Limits
|
limiter.Limits
|
||||||
|
@ -102,6 +104,9 @@ func init() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// parse target pack size from env, on error the default value will be used
|
||||||
|
targetPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_PACK_SIZE"), 10, 32)
|
||||||
|
|
||||||
f := cmdRoot.PersistentFlags()
|
f := cmdRoot.PersistentFlags()
|
||||||
f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
|
f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
|
||||||
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", os.Getenv("RESTIC_REPOSITORY_FILE"), "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
|
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", os.Getenv("RESTIC_REPOSITORY_FILE"), "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
|
||||||
|
@ -121,6 +126,7 @@ func init() {
|
||||||
f.Var(&globalOptions.Compression, "compression", "compression mode (only available for repository format version 2), one of (auto|off|max)")
|
f.Var(&globalOptions.Compression, "compression", "compression mode (only available for repository format version 2), one of (auto|off|max)")
|
||||||
f.IntVar(&globalOptions.Limits.UploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
|
f.IntVar(&globalOptions.Limits.UploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
|
||||||
f.IntVar(&globalOptions.Limits.DownloadKb, "limit-download", 0, "limits downloads to a maximum rate in KiB/s. (default: unlimited)")
|
f.IntVar(&globalOptions.Limits.DownloadKb, "limit-download", 0, "limits downloads to a maximum rate in KiB/s. (default: unlimited)")
|
||||||
|
f.UintVar(&globalOptions.PackSize, "pack-size", uint(targetPackSize), "set target pack size in MiB, created pack files may be larger (default: $RESTIC_PACK_SIZE)")
|
||||||
f.StringSliceVarP(&globalOptions.Options, "option", "o", []string{}, "set extended option (`key=value`, can be specified multiple times)")
|
f.StringSliceVarP(&globalOptions.Options, "option", "o", []string{}, "set extended option (`key=value`, can be specified multiple times)")
|
||||||
// Use our "generate" command instead of the cobra provided "completion" command
|
// Use our "generate" command instead of the cobra provided "completion" command
|
||||||
cmdRoot.CompletionOptions.DisableDefaultCmd = true
|
cmdRoot.CompletionOptions.DisableDefaultCmd = true
|
||||||
|
@ -440,7 +446,13 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s := repository.New(be, repository.Options{Compression: opts.Compression})
|
s, err := repository.New(be, repository.Options{
|
||||||
|
Compression: opts.Compression,
|
||||||
|
PackSize: opts.PackSize * 1024 * 1024,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
passwordTriesLeft := 1
|
passwordTriesLeft := 1
|
||||||
if stdinIsTerminal() && opts.password == "" {
|
if stdinIsTerminal() && opts.password == "" {
|
||||||
|
|
|
@ -1611,6 +1611,11 @@ func testPruneVariants(t *testing.T, unsafeNoSpaceRecovery bool) {
|
||||||
checkOpts := CheckOptions{ReadData: true}
|
checkOpts := CheckOptions{ReadData: true}
|
||||||
testPrune(t, opts, checkOpts)
|
testPrune(t, opts, checkOpts)
|
||||||
})
|
})
|
||||||
|
t.Run("Small", func(t *testing.T) {
|
||||||
|
opts := PruneOptions{MaxUnused: "unlimited", RepackSmall: true}
|
||||||
|
checkOpts := CheckOptions{ReadData: true, CheckUnused: true}
|
||||||
|
testPrune(t, opts, checkOpts)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPrune(t *testing.T, pruneOpts PruneOptions, checkOpts CheckOptions) {
|
func testPrune(t *testing.T, pruneOpts PruneOptions, checkOpts CheckOptions) {
|
||||||
|
|
|
@ -554,6 +554,7 @@ environment variables. The following lists these environment variables:
|
||||||
RESTIC_CACHE_DIR Location of the cache directory
|
RESTIC_CACHE_DIR Location of the cache directory
|
||||||
RESTIC_COMPRESSION Compression mode (only available for repository format version 2)
|
RESTIC_COMPRESSION Compression mode (only available for repository format version 2)
|
||||||
RESTIC_PROGRESS_FPS Frames per second by which the progress bar is updated
|
RESTIC_PROGRESS_FPS Frames per second by which the progress bar is updated
|
||||||
|
RESTIC_PACK_SIZE Target size for pack files
|
||||||
|
|
||||||
TMPDIR Location for temporary files
|
TMPDIR Location for temporary files
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
- for subsections
|
- for subsections
|
||||||
^ for subsubsections
|
^ for subsubsections
|
||||||
" for paragraphs
|
" for paragraphs
|
||||||
|
|
||||||
########################
|
########################
|
||||||
Tuning Backup Parameters
|
Tuning Backup Parameters
|
||||||
########################
|
########################
|
||||||
|
@ -48,3 +49,26 @@ which will compress very fast), ``max`` (which will trade backup speed and CPU u
|
||||||
slightly better compression), or ``off`` (which disables compression). Each setting is
|
slightly better compression), or ``off`` (which disables compression). Each setting is
|
||||||
only applied for the single run of restic. The option can also be set via the environment
|
only applied for the single run of restic. The option can also be set via the environment
|
||||||
variable ``RESTIC_COMPRESSION``.
|
variable ``RESTIC_COMPRESSION``.
|
||||||
|
|
||||||
|
|
||||||
|
Pack Size
|
||||||
|
=========
|
||||||
|
|
||||||
|
In certain instances, such as very large repositories (in the TiB range) or very fast
|
||||||
|
upload connections, it is desirable to use larger pack sizes to reduce the number of
|
||||||
|
files in the repository and improve upload performance. Notable examples are OpenStack
|
||||||
|
Swift and some Google Drive Team accounts, where there are hard limits on the total
|
||||||
|
number of files. Larger pack sizes can also improve the backup speed for a repository
|
||||||
|
stored on a local HDD. This can be achieved by either using the ``--pack-size`` option
|
||||||
|
or defining the ``$RESTIC_PACK_SIZE`` environment variable. Restic currently defaults
|
||||||
|
to a 16 MiB pack size.
|
||||||
|
|
||||||
|
The side effect of increasing the pack size is requiring more disk space for temporary pack
|
||||||
|
files created before uploading. The space must be available in the system default temp
|
||||||
|
directory, unless overwritten by setting the ``$TMPDIR`` environment variable. In addition,
|
||||||
|
depending on the backend the memory usage can also increase by a similar amount. Restic
|
||||||
|
requires temporary space according to the pack size, multiplied by the number
|
||||||
|
of backend connections plus one. For example, if the backend uses 5 connections (the default
|
||||||
|
for most backends), with a target pack size of 64 MiB, you'll need a *minimum* of 384 MiB
|
||||||
|
of space in the temp directory. A bit of tuning may be required to strike a balance between
|
||||||
|
resource usage at the backup client and the number of pack files in the repository.
|
||||||
|
|
|
@ -56,6 +56,7 @@ Usage help is available:
|
||||||
--key-hint key key ID of key to try decrypting first (default: $RESTIC_KEY_HINT)
|
--key-hint key key ID of key to try decrypting first (default: $RESTIC_KEY_HINT)
|
||||||
--limit-download int limits downloads to a maximum rate in KiB/s. (default: unlimited)
|
--limit-download int limits downloads to a maximum rate in KiB/s. (default: unlimited)
|
||||||
--limit-upload int limits uploads to a maximum rate in KiB/s. (default: unlimited)
|
--limit-upload int limits uploads to a maximum rate in KiB/s. (default: unlimited)
|
||||||
|
--pack-size uint set target pack size in MiB. (default: $RESTIC_PACK_SIZE)
|
||||||
--no-cache do not use a local cache
|
--no-cache do not use a local cache
|
||||||
--no-lock do not lock the repository, this allows some operations on read-only repositories
|
--no-lock do not lock the repository, this allows some operations on read-only repositories
|
||||||
-o, --option key=value set extended option (key=value, can be specified multiple times)
|
-o, --option key=value set extended option (key=value, can be specified multiple times)
|
||||||
|
@ -128,6 +129,7 @@ command:
|
||||||
--key-hint key key ID of key to try decrypting first (default: $RESTIC_KEY_HINT)
|
--key-hint key key ID of key to try decrypting first (default: $RESTIC_KEY_HINT)
|
||||||
--limit-download int limits downloads to a maximum rate in KiB/s. (default: unlimited)
|
--limit-download int limits downloads to a maximum rate in KiB/s. (default: unlimited)
|
||||||
--limit-upload int limits uploads to a maximum rate in KiB/s. (default: unlimited)
|
--limit-upload int limits uploads to a maximum rate in KiB/s. (default: unlimited)
|
||||||
|
--pack-size uint set target pack size in MiB. (default: $RESTIC_PACK_SIZE)
|
||||||
--no-cache do not use a local cache
|
--no-cache do not use a local cache
|
||||||
--no-lock do not lock the repository, this allows some operations on read-only repositories
|
--no-lock do not lock the repository, this allows some operations on read-only repositories
|
||||||
-o, --option key=value set extended option (key=value, can be specified multiple times)
|
-o, --option key=value set extended option (key=value, can be specified multiple times)
|
||||||
|
|
|
@ -291,6 +291,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||||
opts.ContentType = "application/octet-stream"
|
opts.ContentType = "application/octet-stream"
|
||||||
// the only option with the high-level api is to let the library handle the checksum computation
|
// the only option with the high-level api is to let the library handle the checksum computation
|
||||||
opts.SendContentMd5 = true
|
opts.SendContentMd5 = true
|
||||||
|
// only use multipart uploads for very large files
|
||||||
|
opts.PartSize = 200 * 1024 * 1024
|
||||||
|
|
||||||
debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length())
|
debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length())
|
||||||
info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, ioutil.NopCloser(rd), int64(rd.Length()), opts)
|
info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, ioutil.NopCloser(rd), int64(rd.Length()), opts)
|
||||||
|
|
|
@ -348,7 +348,8 @@ func TestCheckerModifiedData(t *testing.T) {
|
||||||
t.Logf("archived as %v", sn.ID().Str())
|
t.Logf("archived as %v", sn.ID().Str())
|
||||||
|
|
||||||
beError := &errorBackend{Backend: repo.Backend()}
|
beError := &errorBackend{Backend: repo.Backend()}
|
||||||
checkRepo := repository.New(beError, repository.Options{})
|
checkRepo, err := repository.New(beError, repository.Options{})
|
||||||
|
test.OK(t, err)
|
||||||
test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, ""))
|
test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, ""))
|
||||||
|
|
||||||
chkr := checker.New(checkRepo, false)
|
chkr := checker.New(checkRepo, false)
|
||||||
|
|
|
@ -157,6 +157,13 @@ func (p *Packer) Count() int {
|
||||||
return len(p.blobs)
|
return len(p.blobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HeaderFull returns true if the pack header is full.
|
||||||
|
func (p *Packer) HeaderFull() bool {
|
||||||
|
p.m.Lock()
|
||||||
|
defer p.m.Unlock()
|
||||||
|
return headerSize+uint(len(p.blobs)+1)*entrySize > MaxHeaderSize
|
||||||
|
}
|
||||||
|
|
||||||
// Blobs returns the slice of blobs that have been written.
|
// Blobs returns the slice of blobs that have been written.
|
||||||
func (p *Packer) Blobs() []restic.Blob {
|
func (p *Packer) Blobs() []restic.Blob {
|
||||||
p.m.Lock()
|
p.m.Lock()
|
||||||
|
|
|
@ -34,19 +34,19 @@ type packerManager struct {
|
||||||
key *crypto.Key
|
key *crypto.Key
|
||||||
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
|
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
|
||||||
|
|
||||||
pm sync.Mutex
|
pm sync.Mutex
|
||||||
packer *Packer
|
packer *Packer
|
||||||
|
packSize uint
|
||||||
}
|
}
|
||||||
|
|
||||||
const minPackSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
// newPackerManager returns an new packer manager which writes temporary files
|
// newPackerManager returns an new packer manager which writes temporary files
|
||||||
// to a temporary directory
|
// to a temporary directory
|
||||||
func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
|
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
|
||||||
return &packerManager{
|
return &packerManager{
|
||||||
tpe: tpe,
|
tpe: tpe,
|
||||||
key: key,
|
key: key,
|
||||||
queueFn: queueFn,
|
queueFn: queueFn,
|
||||||
|
packSize: packSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,8 +87,8 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the pack is not full enough, put back to the list
|
// if the pack and header is not full enough, put back to the list
|
||||||
if packer.Size() < minPackSize {
|
if packer.Size() < r.packSize && !packer.HeaderFull() {
|
||||||
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
||||||
return size, nil
|
return size, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func min(a, b int) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
|
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 102; i++ {
|
||||||
l := rnd.Intn(maxBlobSize)
|
l := rnd.Intn(maxBlobSize)
|
||||||
id := randomID(rnd)
|
id := randomID(rnd)
|
||||||
buf = buf[:l]
|
buf = buf[:l]
|
||||||
|
@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 {
|
||||||
rnd := rand.New(rand.NewSource(randomSeed))
|
rnd := rand.New(rand.NewSource(randomSeed))
|
||||||
|
|
||||||
savedBytes := int(0)
|
savedBytes := int(0)
|
||||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
|
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
|
||||||
err := p.Finalize()
|
err := p.Finalize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -104,7 +104,7 @@ func BenchmarkPackerManager(t *testing.B) {
|
||||||
|
|
||||||
for i := 0; i < t.N; i++ {
|
for i := 0; i < t.N; i++ {
|
||||||
rnd.Seed(randomSeed)
|
rnd.Seed(randomSeed)
|
||||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error {
|
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
fillPacks(t, rnd, pm, blobBuf)
|
fillPacks(t, rnd, pm, blobBuf)
|
||||||
|
|
|
@ -28,6 +28,10 @@ import (
|
||||||
|
|
||||||
const MaxStreamBufferSize = 4 * 1024 * 1024
|
const MaxStreamBufferSize = 4 * 1024 * 1024
|
||||||
|
|
||||||
|
const MinPackSize = 4 * 1024 * 1024
|
||||||
|
const DefaultPackSize = 16 * 1024 * 1024
|
||||||
|
const MaxPackSize = 128 * 1024 * 1024
|
||||||
|
|
||||||
// Repository is used to access a repository in a backend.
|
// Repository is used to access a repository in a backend.
|
||||||
type Repository struct {
|
type Repository struct {
|
||||||
be restic.Backend
|
be restic.Backend
|
||||||
|
@ -54,6 +58,7 @@ type Repository struct {
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Compression CompressionMode
|
Compression CompressionMode
|
||||||
|
PackSize uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompressionMode configures if data should be compressed.
|
// CompressionMode configures if data should be compressed.
|
||||||
|
@ -100,14 +105,23 @@ func (c *CompressionMode) Type() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new repository with backend be.
|
// New returns a new repository with backend be.
|
||||||
func New(be restic.Backend, opts Options) *Repository {
|
func New(be restic.Backend, opts Options) (*Repository, error) {
|
||||||
|
if opts.PackSize == 0 {
|
||||||
|
opts.PackSize = DefaultPackSize
|
||||||
|
}
|
||||||
|
if opts.PackSize > MaxPackSize {
|
||||||
|
return nil, errors.Fatalf("pack size larger than limit of %v MiB", MaxPackSize/1024/1024)
|
||||||
|
} else if opts.PackSize < MinPackSize {
|
||||||
|
return nil, errors.Fatalf("pack size smaller than minimum of %v MiB", MinPackSize/1024/1024)
|
||||||
|
}
|
||||||
|
|
||||||
repo := &Repository{
|
repo := &Repository{
|
||||||
be: be,
|
be: be,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
idx: NewMasterIndex(),
|
idx: NewMasterIndex(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return repo
|
return repo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableAutoIndexUpdate deactives the automatic finalization and upload of new
|
// DisableAutoIndexUpdate deactives the automatic finalization and upload of new
|
||||||
|
@ -129,6 +143,11 @@ func (r *Repository) Config() restic.Config {
|
||||||
return r.cfg
|
return r.cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PackSize return the target size of a pack file when uploading
|
||||||
|
func (r *Repository) PackSize() uint {
|
||||||
|
return r.opts.PackSize
|
||||||
|
}
|
||||||
|
|
||||||
// UseCache replaces the backend with the wrapped cache.
|
// UseCache replaces the backend with the wrapped cache.
|
||||||
func (r *Repository) UseCache(c *cache.Cache) {
|
func (r *Repository) UseCache(c *cache.Cache) {
|
||||||
if c == nil {
|
if c == nil {
|
||||||
|
@ -497,8 +516,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
|
||||||
innerWg, ctx := errgroup.WithContext(ctx)
|
innerWg, ctx := errgroup.WithContext(ctx)
|
||||||
r.packerWg = innerWg
|
r.packerWg = innerWg
|
||||||
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
|
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
|
||||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker)
|
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.uploader.QueuePacker)
|
||||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker)
|
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.uploader.QueuePacker)
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
return innerWg.Wait()
|
return innerWg.Wait()
|
||||||
|
@ -812,6 +831,9 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
|
||||||
|
|
||||||
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
||||||
|
|
||||||
|
// Skip sections with more than 4MB unused blobs
|
||||||
|
const maxUnusedRange = 4 * 1024 * 1024
|
||||||
|
|
||||||
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
|
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
|
||||||
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In
|
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In
|
||||||
// case of download errors handleBlobFn might be called multiple times for the same blob. If the
|
// case of download errors handleBlobFn might be called multiple times for the same blob. If the
|
||||||
|
@ -825,6 +847,29 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
|
||||||
sort.Slice(blobs, func(i, j int) bool {
|
sort.Slice(blobs, func(i, j int) bool {
|
||||||
return blobs[i].Offset < blobs[j].Offset
|
return blobs[i].Offset < blobs[j].Offset
|
||||||
})
|
})
|
||||||
|
|
||||||
|
lowerIdx := 0
|
||||||
|
lastPos := blobs[0].Offset
|
||||||
|
for i := 0; i < len(blobs); i++ {
|
||||||
|
if blobs[i].Offset < lastPos {
|
||||||
|
// don't wait for streamPackPart to fail
|
||||||
|
return errors.Errorf("overlapping blobs in pack %v", packID)
|
||||||
|
}
|
||||||
|
if blobs[i].Offset-lastPos > maxUnusedRange {
|
||||||
|
// load everything up to the skipped file section
|
||||||
|
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lowerIdx = i
|
||||||
|
}
|
||||||
|
lastPos = blobs[i].Offset + blobs[i].Length
|
||||||
|
}
|
||||||
|
// load remainder
|
||||||
|
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func streamPackPart(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||||
h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob}
|
h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob}
|
||||||
|
|
||||||
dataStart := blobs[0].Offset
|
dataStart := blobs[0].Offset
|
||||||
|
|
|
@ -455,17 +455,19 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blobSizes := []int{
|
blobSizes := []int{
|
||||||
|
5522811,
|
||||||
10,
|
10,
|
||||||
5231,
|
5231,
|
||||||
18812,
|
18812,
|
||||||
123123,
|
123123,
|
||||||
|
13522811,
|
||||||
12301,
|
12301,
|
||||||
892242,
|
892242,
|
||||||
28616,
|
28616,
|
||||||
13351,
|
13351,
|
||||||
252287,
|
252287,
|
||||||
188883,
|
188883,
|
||||||
2522811,
|
3522811,
|
||||||
18883,
|
18883,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -481,6 +483,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
|
|
||||||
packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key, compress)
|
packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key, compress)
|
||||||
|
|
||||||
|
loadCalls := 0
|
||||||
load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
data := packfile
|
data := packfile
|
||||||
|
|
||||||
|
@ -495,6 +498,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
}
|
}
|
||||||
|
|
||||||
data = data[:length]
|
data = data[:length]
|
||||||
|
loadCalls++
|
||||||
|
|
||||||
return fn(bytes.NewReader(data))
|
return fn(bytes.NewReader(data))
|
||||||
|
|
||||||
|
@ -504,19 +508,20 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
t.Run("regular", func(t *testing.T) {
|
t.Run("regular", func(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
blobs []restic.Blob
|
blobs []restic.Blob
|
||||||
|
calls int
|
||||||
}{
|
}{
|
||||||
{packfileBlobs[1:2]},
|
{packfileBlobs[1:2], 1},
|
||||||
{packfileBlobs[2:5]},
|
{packfileBlobs[2:5], 1},
|
||||||
{packfileBlobs[2:8]},
|
{packfileBlobs[2:8], 1},
|
||||||
{[]restic.Blob{
|
{[]restic.Blob{
|
||||||
packfileBlobs[0],
|
packfileBlobs[0],
|
||||||
packfileBlobs[8],
|
|
||||||
packfileBlobs[4],
|
packfileBlobs[4],
|
||||||
}},
|
packfileBlobs[2],
|
||||||
|
}, 1},
|
||||||
{[]restic.Blob{
|
{[]restic.Blob{
|
||||||
packfileBlobs[0],
|
packfileBlobs[0],
|
||||||
packfileBlobs[len(packfileBlobs)-1],
|
packfileBlobs[len(packfileBlobs)-1],
|
||||||
}},
|
}, 2},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
@ -542,6 +547,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
wantBlobs[blob.ID] = 1
|
wantBlobs[blob.ID] = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
loadCalls = 0
|
||||||
err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -550,6 +556,7 @@ func testStreamPack(t *testing.T, version uint) {
|
||||||
if !cmp.Equal(wantBlobs, gotBlobs) {
|
if !cmp.Equal(wantBlobs, gotBlobs) {
|
||||||
t.Fatal(cmp.Diff(wantBlobs, gotBlobs))
|
t.Fatal(cmp.Diff(wantBlobs, gotBlobs))
|
||||||
}
|
}
|
||||||
|
rtest.Equals(t, test.calls, loadCalls)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -52,10 +52,13 @@ func TestRepositoryWithBackend(t testing.TB, be restic.Backend, version uint) (r
|
||||||
be, beCleanup = TestBackend(t)
|
be, beCleanup = TestBackend(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := New(be, Options{})
|
repo, err := New(be, Options{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("TestRepository(): new repo failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
cfg := restic.TestCreateConfig(t, TestChunkerPol, version)
|
cfg := restic.TestCreateConfig(t, TestChunkerPol, version)
|
||||||
err := repo.init(context.TODO(), test.TestPassword, cfg)
|
err = repo.init(context.TODO(), test.TestPassword, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
|
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -104,7 +107,10 @@ func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := New(be, Options{})
|
repo, err := New(be, Options{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "")
|
err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -25,6 +25,7 @@ type Repository interface {
|
||||||
LookupBlobSize(ID, BlobType) (uint, bool)
|
LookupBlobSize(ID, BlobType) (uint, bool)
|
||||||
|
|
||||||
Config() Config
|
Config() Config
|
||||||
|
PackSize() uint
|
||||||
|
|
||||||
// List calls the function fn for each file of type t in the repository.
|
// List calls the function fn for each file of type t in the repository.
|
||||||
// When an error is returned by fn, processing stops and List() returns the
|
// When an error is returned by fn, processing stops and List() returns the
|
||||||
|
|
Loading…
Reference in a new issue