forked from TrueCloudLab/restic
add option for setting min pack size
This commit is contained in:
parent
04a8ee80fb
commit
0a6fa602c8
8 changed files with 67 additions and 22 deletions
|
@ -86,7 +86,13 @@ func runInit(opts InitOptions, gopts GlobalOptions, args []string) error {
|
||||||
return errors.Fatalf("create repository at %s failed: %v\n", location.StripPassword(gopts.Repo), err)
|
return errors.Fatalf("create repository at %s failed: %v\n", location.StripPassword(gopts.Repo), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s := repository.New(be, repository.Options{Compression: gopts.Compression})
|
s, err := repository.New(be, repository.Options{
|
||||||
|
Compression: gopts.Compression,
|
||||||
|
PackSize: gopts.MinPackSize * 1024 * 1024,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = s.Init(gopts.ctx, version, gopts.password, chunkerPolynomial)
|
err = s.Init(gopts.ctx, version, gopts.password, chunkerPolynomial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -62,6 +63,7 @@ type GlobalOptions struct {
|
||||||
NoCache bool
|
NoCache bool
|
||||||
CleanupCache bool
|
CleanupCache bool
|
||||||
Compression repository.CompressionMode
|
Compression repository.CompressionMode
|
||||||
|
MinPackSize uint
|
||||||
|
|
||||||
backend.TransportOptions
|
backend.TransportOptions
|
||||||
limiter.Limits
|
limiter.Limits
|
||||||
|
@ -102,6 +104,9 @@ func init() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// parse min pack size from env, on error the default value will be used
|
||||||
|
minPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_MIN_PACKSIZE"), 10, 32)
|
||||||
|
|
||||||
f := cmdRoot.PersistentFlags()
|
f := cmdRoot.PersistentFlags()
|
||||||
f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
|
f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
|
||||||
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", os.Getenv("RESTIC_REPOSITORY_FILE"), "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
|
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", os.Getenv("RESTIC_REPOSITORY_FILE"), "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
|
||||||
|
@ -121,6 +126,7 @@ func init() {
|
||||||
f.Var(&globalOptions.Compression, "compression", "compression mode (only available for repository format version 2), one of (auto|off|max)")
|
f.Var(&globalOptions.Compression, "compression", "compression mode (only available for repository format version 2), one of (auto|off|max)")
|
||||||
f.IntVar(&globalOptions.Limits.UploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
|
f.IntVar(&globalOptions.Limits.UploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
|
||||||
f.IntVar(&globalOptions.Limits.DownloadKb, "limit-download", 0, "limits downloads to a maximum rate in KiB/s. (default: unlimited)")
|
f.IntVar(&globalOptions.Limits.DownloadKb, "limit-download", 0, "limits downloads to a maximum rate in KiB/s. (default: unlimited)")
|
||||||
|
f.UintVar(&globalOptions.MinPackSize, "min-packsize", uint(minPackSize), "set min pack size in MiB. (default: $RESTIC_MIN_PACKSIZE)")
|
||||||
f.StringSliceVarP(&globalOptions.Options, "option", "o", []string{}, "set extended option (`key=value`, can be specified multiple times)")
|
f.StringSliceVarP(&globalOptions.Options, "option", "o", []string{}, "set extended option (`key=value`, can be specified multiple times)")
|
||||||
// Use our "generate" command instead of the cobra provided "completion" command
|
// Use our "generate" command instead of the cobra provided "completion" command
|
||||||
cmdRoot.CompletionOptions.DisableDefaultCmd = true
|
cmdRoot.CompletionOptions.DisableDefaultCmd = true
|
||||||
|
@ -440,7 +446,13 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s := repository.New(be, repository.Options{Compression: opts.Compression})
|
s, err := repository.New(be, repository.Options{
|
||||||
|
Compression: opts.Compression,
|
||||||
|
PackSize: opts.MinPackSize * 1024 * 1024,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
passwordTriesLeft := 1
|
passwordTriesLeft := 1
|
||||||
if stdinIsTerminal() && opts.password == "" {
|
if stdinIsTerminal() && opts.password == "" {
|
||||||
|
|
|
@ -348,7 +348,8 @@ func TestCheckerModifiedData(t *testing.T) {
|
||||||
t.Logf("archived as %v", sn.ID().Str())
|
t.Logf("archived as %v", sn.ID().Str())
|
||||||
|
|
||||||
beError := &errorBackend{Backend: repo.Backend()}
|
beError := &errorBackend{Backend: repo.Backend()}
|
||||||
checkRepo := repository.New(beError, repository.Options{})
|
checkRepo, err := repository.New(beError, repository.Options{})
|
||||||
|
test.OK(t, err)
|
||||||
test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, ""))
|
test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, ""))
|
||||||
|
|
||||||
chkr := checker.New(checkRepo, false)
|
chkr := checker.New(checkRepo, false)
|
||||||
|
|
|
@ -34,19 +34,19 @@ type packerManager struct {
|
||||||
key *crypto.Key
|
key *crypto.Key
|
||||||
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
|
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
|
||||||
|
|
||||||
pm sync.Mutex
|
pm sync.Mutex
|
||||||
packer *Packer
|
packer *Packer
|
||||||
|
packSize uint
|
||||||
}
|
}
|
||||||
|
|
||||||
const minPackSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
// newPackerManager returns an new packer manager which writes temporary files
|
// newPackerManager returns an new packer manager which writes temporary files
|
||||||
// to a temporary directory
|
// to a temporary directory
|
||||||
func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
|
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
|
||||||
return &packerManager{
|
return &packerManager{
|
||||||
tpe: tpe,
|
tpe: tpe,
|
||||||
key: key,
|
key: key,
|
||||||
queueFn: queueFn,
|
queueFn: queueFn,
|
||||||
|
packSize: packSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the pack is not full enough, put back to the list
|
// if the pack is not full enough, put back to the list
|
||||||
if packer.Size() < minPackSize {
|
if packer.Size() < r.packSize {
|
||||||
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
debug.Log("pack is not full enough (%d bytes)", packer.Size())
|
||||||
return size, nil
|
return size, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func min(a, b int) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
|
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 102; i++ {
|
||||||
l := rnd.Intn(maxBlobSize)
|
l := rnd.Intn(maxBlobSize)
|
||||||
id := randomID(rnd)
|
id := randomID(rnd)
|
||||||
buf = buf[:l]
|
buf = buf[:l]
|
||||||
|
@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 {
|
||||||
rnd := rand.New(rand.NewSource(randomSeed))
|
rnd := rand.New(rand.NewSource(randomSeed))
|
||||||
|
|
||||||
savedBytes := int(0)
|
savedBytes := int(0)
|
||||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
|
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
|
||||||
err := p.Finalize()
|
err := p.Finalize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -104,7 +104,7 @@ func BenchmarkPackerManager(t *testing.B) {
|
||||||
|
|
||||||
for i := 0; i < t.N; i++ {
|
for i := 0; i < t.N; i++ {
|
||||||
rnd.Seed(randomSeed)
|
rnd.Seed(randomSeed)
|
||||||
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error {
|
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
fillPacks(t, rnd, pm, blobBuf)
|
fillPacks(t, rnd, pm, blobBuf)
|
||||||
|
|
|
@ -28,6 +28,10 @@ import (
|
||||||
|
|
||||||
const MaxStreamBufferSize = 4 * 1024 * 1024
|
const MaxStreamBufferSize = 4 * 1024 * 1024
|
||||||
|
|
||||||
|
const MinPackSize = 4 * 1024 * 1024
|
||||||
|
const DefaultPackSize = 16 * 1024 * 1024
|
||||||
|
const MaxPackSize = 128 * 1024 * 1024
|
||||||
|
|
||||||
// Repository is used to access a repository in a backend.
|
// Repository is used to access a repository in a backend.
|
||||||
type Repository struct {
|
type Repository struct {
|
||||||
be restic.Backend
|
be restic.Backend
|
||||||
|
@ -54,6 +58,7 @@ type Repository struct {
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Compression CompressionMode
|
Compression CompressionMode
|
||||||
|
PackSize uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompressionMode configures if data should be compressed.
|
// CompressionMode configures if data should be compressed.
|
||||||
|
@ -100,14 +105,23 @@ func (c *CompressionMode) Type() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new repository with backend be.
|
// New returns a new repository with backend be.
|
||||||
func New(be restic.Backend, opts Options) *Repository {
|
func New(be restic.Backend, opts Options) (*Repository, error) {
|
||||||
|
if opts.PackSize == 0 {
|
||||||
|
opts.PackSize = DefaultPackSize
|
||||||
|
}
|
||||||
|
if opts.PackSize > MaxPackSize {
|
||||||
|
return nil, errors.Fatalf("pack size larger than limit of %v MiB", MaxPackSize/1024/1024)
|
||||||
|
} else if opts.PackSize < MinPackSize {
|
||||||
|
return nil, errors.Fatalf("pack size smaller than minimum of %v MiB", MinPackSize/1024/1024)
|
||||||
|
}
|
||||||
|
|
||||||
repo := &Repository{
|
repo := &Repository{
|
||||||
be: be,
|
be: be,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
idx: NewMasterIndex(),
|
idx: NewMasterIndex(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return repo
|
return repo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableAutoIndexUpdate deactives the automatic finalization and upload of new
|
// DisableAutoIndexUpdate deactives the automatic finalization and upload of new
|
||||||
|
@ -129,6 +143,11 @@ func (r *Repository) Config() restic.Config {
|
||||||
return r.cfg
|
return r.cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MinPackSize return the minimum size of a pack file before uploading
|
||||||
|
func (r *Repository) MinPackSize() uint {
|
||||||
|
return r.opts.PackSize
|
||||||
|
}
|
||||||
|
|
||||||
// UseCache replaces the backend with the wrapped cache.
|
// UseCache replaces the backend with the wrapped cache.
|
||||||
func (r *Repository) UseCache(c *cache.Cache) {
|
func (r *Repository) UseCache(c *cache.Cache) {
|
||||||
if c == nil {
|
if c == nil {
|
||||||
|
@ -497,8 +516,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
|
||||||
innerWg, ctx := errgroup.WithContext(ctx)
|
innerWg, ctx := errgroup.WithContext(ctx)
|
||||||
r.packerWg = innerWg
|
r.packerWg = innerWg
|
||||||
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
|
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
|
||||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker)
|
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.MinPackSize(), r.uploader.QueuePacker)
|
||||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker)
|
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.MinPackSize(), r.uploader.QueuePacker)
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
return innerWg.Wait()
|
return innerWg.Wait()
|
||||||
|
|
|
@ -52,10 +52,13 @@ func TestRepositoryWithBackend(t testing.TB, be restic.Backend, version uint) (r
|
||||||
be, beCleanup = TestBackend(t)
|
be, beCleanup = TestBackend(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := New(be, Options{})
|
repo, err := New(be, Options{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("TestRepository(): new repo failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
cfg := restic.TestCreateConfig(t, TestChunkerPol, version)
|
cfg := restic.TestCreateConfig(t, TestChunkerPol, version)
|
||||||
err := repo.init(context.TODO(), test.TestPassword, cfg)
|
err = repo.init(context.TODO(), test.TestPassword, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
|
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -104,7 +107,10 @@ func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := New(be, Options{})
|
repo, err := New(be, Options{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "")
|
err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -25,6 +25,7 @@ type Repository interface {
|
||||||
LookupBlobSize(ID, BlobType) (uint, bool)
|
LookupBlobSize(ID, BlobType) (uint, bool)
|
||||||
|
|
||||||
Config() Config
|
Config() Config
|
||||||
|
MinPackSize() uint
|
||||||
|
|
||||||
// List calls the function fn for each file of type t in the repository.
|
// List calls the function fn for each file of type t in the repository.
|
||||||
// When an error is returned by fn, processing stops and List() returns the
|
// When an error is returned by fn, processing stops and List() returns the
|
||||||
|
|
Loading…
Reference in a new issue