Merge pull request #3717 from MichaelEischer/fix-stuck-repack

Fix stuck repack step
This commit is contained in:
Alexander Neumann 2022-04-30 09:50:43 +02:00 committed by GitHub
commit 4e1ef7804a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 137 additions and 36 deletions

View file

@ -0,0 +1,11 @@
Enhancement: Stream data in check and prune commands
`check --read-data` and `prune` downloaded data files into temporary files
which can end up being written to disk. This could cause a large amount of data
being written to disk. The pack files are now streamed which no longer needs
temporary files. Please note that uploads during `backup` and `prune` still
require temporary files.
https://github.com/restic/restic/pull/3484
https://github.com/restic/restic/issues/3710
https://github.com/restic/restic/pull/3717

View file

@ -132,6 +132,10 @@ func runPrune(opts PruneOptions, gopts GlobalOptions) error {
return err return err
} }
if repo.Backend().Connections() < 2 {
return errors.Fatal("prune requires a backend connection limit of at least two")
}
lock, err := lockRepoExclusive(gopts.ctx, repo) lock, err := lockRepoExclusive(gopts.ctx, repo)
defer unlockRepo(lock) defer unlockRepo(lock)
if err != nil { if err != nil {

View file

@ -24,6 +24,7 @@ import (
type Backend struct { type Backend struct {
accountName string accountName string
container *storage.Container container *storage.Container
connections uint
sem *backend.Semaphore sem *backend.Semaphore
prefix string prefix string
listMaxItems int listMaxItems int
@ -55,6 +56,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
be := &Backend{ be := &Backend{
container: service.GetContainerReference(cfg.Container), container: service.GetContainerReference(cfg.Container),
accountName: cfg.AccountName, accountName: cfg.AccountName,
connections: cfg.Connections,
sem: sem, sem: sem,
prefix: cfg.Prefix, prefix: cfg.Prefix,
Layout: &backend.DefaultLayout{ Layout: &backend.DefaultLayout{
@ -109,6 +111,10 @@ func (be *Backend) Join(p ...string) string {
return path.Join(p...) return path.Join(p...)
} }
func (be *Backend) Connections() uint {
return be.connections
}
// Location returns this backend's location (the container name). // Location returns this backend's location (the container name).
func (be *Backend) Location() string { func (be *Backend) Location() string {
return be.Join(be.container.Name, be.prefix) return be.Join(be.container.Name, be.prefix)

View file

@ -133,6 +133,10 @@ func (be *b2Backend) SetListMaxItems(i int) {
be.listMaxItems = i be.listMaxItems = i
} }
func (be *b2Backend) Connections() uint {
return be.cfg.Connections
}
// Location returns the location for the backend. // Location returns the location for the backend.
func (be *b2Backend) Location() string { func (be *b2Backend) Location() string {
return be.cfg.Bucket return be.cfg.Bucket

View file

@ -45,6 +45,10 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
return nil return nil
} }
func (be *Backend) Connections() uint {
return be.b.Connections()
}
// Location returns the location of the backend. // Location returns the location of the backend.
func (be *Backend) Location() string { func (be *Backend) Location() string {
return "DRY:" + be.b.Location() return "DRY:" + be.b.Location()

View file

@ -34,6 +34,7 @@ import (
type Backend struct { type Backend struct {
gcsClient *storage.Client gcsClient *storage.Client
projectID string projectID string
connections uint
sem *backend.Semaphore sem *backend.Semaphore
bucketName string bucketName string
bucket *storage.BucketHandle bucket *storage.BucketHandle
@ -104,6 +105,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
be := &Backend{ be := &Backend{
gcsClient: gcsClient, gcsClient: gcsClient,
projectID: cfg.ProjectID, projectID: cfg.ProjectID,
connections: cfg.Connections,
sem: sem, sem: sem,
bucketName: cfg.Bucket, bucketName: cfg.Bucket,
bucket: gcsClient.Bucket(cfg.Bucket), bucket: gcsClient.Bucket(cfg.Bucket),
@ -185,6 +187,10 @@ func (be *Backend) Join(p ...string) string {
return path.Join(p...) return path.Join(p...)
} }
func (be *Backend) Connections() uint {
return be.connections
}
// Location returns this backend's location (the bucket name). // Location returns this backend's location (the bucket name).
func (be *Backend) Location() string { func (be *Backend) Location() string {
return be.Join(be.bucketName, be.prefix) return be.Join(be.bucketName, be.prefix)

View file

@ -25,17 +25,26 @@ var _ restic.Backend = &MemoryBackend{}
var errNotFound = errors.New("not found") var errNotFound = errors.New("not found")
const connectionCount = 2
// MemoryBackend is a mock backend that uses a map for storing all data in // MemoryBackend is a mock backend that uses a map for storing all data in
// memory. This should only be used for tests. // memory. This should only be used for tests.
type MemoryBackend struct { type MemoryBackend struct {
data memMap data memMap
m sync.Mutex m sync.Mutex
sem *backend.Semaphore
} }
// New returns a new backend that saves all data in a map in memory. // New returns a new backend that saves all data in a map in memory.
func New() *MemoryBackend { func New() *MemoryBackend {
sem, err := backend.NewSemaphore(connectionCount)
if err != nil {
panic(err)
}
be := &MemoryBackend{ be := &MemoryBackend{
data: make(memMap), data: make(memMap),
sem: sem,
} }
debug.Log("created new memory backend") debug.Log("created new memory backend")
@ -45,6 +54,9 @@ func New() *MemoryBackend {
// Test returns whether a file exists. // Test returns whether a file exists.
func (be *MemoryBackend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *MemoryBackend) Test(ctx context.Context, h restic.Handle) (bool, error) {
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock() be.m.Lock()
defer be.m.Unlock() defer be.m.Unlock()
@ -68,6 +80,9 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re
return backoff.Permanent(err) return backoff.Permanent(err)
} }
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock() be.m.Lock()
defer be.m.Unlock() defer be.m.Unlock()
@ -120,6 +135,7 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
return nil, backoff.Permanent(err) return nil, backoff.Permanent(err)
} }
be.sem.GetToken()
be.m.Lock() be.m.Lock()
defer be.m.Unlock() defer be.m.Unlock()
@ -131,15 +147,18 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
debug.Log("Load %v offset %v len %v", h, offset, length) debug.Log("Load %v offset %v len %v", h, offset, length)
if offset < 0 { if offset < 0 {
be.sem.ReleaseToken()
return nil, errors.New("offset is negative") return nil, errors.New("offset is negative")
} }
if _, ok := be.data[h]; !ok { if _, ok := be.data[h]; !ok {
be.sem.ReleaseToken()
return nil, errNotFound return nil, errNotFound
} }
buf := be.data[h] buf := be.data[h]
if offset > int64(len(buf)) { if offset > int64(len(buf)) {
be.sem.ReleaseToken()
return nil, errors.New("offset beyond end of file") return nil, errors.New("offset beyond end of file")
} }
@ -148,18 +167,21 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
buf = buf[:length] buf = buf[:length]
} }
return ioutil.NopCloser(bytes.NewReader(buf)), ctx.Err() return be.sem.ReleaseTokenOnClose(ioutil.NopCloser(bytes.NewReader(buf)), nil), ctx.Err()
} }
// Stat returns information about a file in the backend. // Stat returns information about a file in the backend.
func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
be.m.Lock()
defer be.m.Unlock()
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err) return restic.FileInfo{}, backoff.Permanent(err)
} }
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock()
defer be.m.Unlock()
h.ContainedBlobType = restic.InvalidBlob h.ContainedBlobType = restic.InvalidBlob
if h.Type == restic.ConfigFile { if h.Type == restic.ConfigFile {
h.Name = "" h.Name = ""
@ -177,6 +199,9 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File
// Remove deletes a file from the backend. // Remove deletes a file from the backend.
func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error {
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock() be.m.Lock()
defer be.m.Unlock() defer be.m.Unlock()
@ -229,6 +254,10 @@ func (be *MemoryBackend) List(ctx context.Context, t restic.FileType, fn func(re
return ctx.Err() return ctx.Err()
} }
func (be *MemoryBackend) Connections() uint {
return connectionCount
}
// Location returns the location of the backend (RAM). // Location returns the location of the backend (RAM).
func (be *MemoryBackend) Location() string { func (be *MemoryBackend) Location() string {
return "RAM" return "RAM"

View file

@ -30,6 +30,7 @@ var _ restic.Backend = &Backend{}
// Backend uses the REST protocol to access data stored on a server. // Backend uses the REST protocol to access data stored on a server.
type Backend struct { type Backend struct {
url *url.URL url *url.URL
connections uint
sem *backend.Semaphore sem *backend.Semaphore
client *http.Client client *http.Client
backend.Layout backend.Layout
@ -60,6 +61,7 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
url: cfg.URL, url: cfg.URL,
client: client, client: client,
Layout: &backend.RESTLayout{URL: url, Join: path.Join}, Layout: &backend.RESTLayout{URL: url, Join: path.Join},
connections: cfg.Connections,
sem: sem, sem: sem,
} }
@ -105,6 +107,10 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er
return be, nil return be, nil
} }
func (b *Backend) Connections() uint {
return b.connections
}
// Location returns this backend's location (the server's URL). // Location returns this backend's location (the server's URL).
func (b *Backend) Location() string { func (b *Backend) Location() string {
return b.url.String() return b.url.String()

View file

@ -255,6 +255,10 @@ func (be *Backend) ReadDir(ctx context.Context, dir string) (list []os.FileInfo,
return list, nil return list, nil
} }
func (be *Backend) Connections() uint {
return be.cfg.Connections
}
// Location returns this backend's location (the bucket name). // Location returns this backend's location (the bucket name).
func (be *Backend) Location() string { func (be *Backend) Location() string {
return be.Join(be.cfg.Bucket, be.cfg.Prefix) return be.Join(be.cfg.Bucket, be.cfg.Prefix)

View file

@ -25,6 +25,7 @@ import (
// beSwift is a backend which stores the data on a swift endpoint. // beSwift is a backend which stores the data on a swift endpoint.
type beSwift struct { type beSwift struct {
conn *swift.Connection conn *swift.Connection
connections uint
sem *backend.Semaphore sem *backend.Semaphore
container string // Container name container string // Container name
prefix string // Prefix of object names in the container prefix string // Prefix of object names in the container
@ -68,6 +69,7 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
Transport: rt, Transport: rt,
}, },
connections: cfg.Connections,
sem: sem, sem: sem,
container: cfg.Container, container: cfg.Container,
prefix: cfg.Prefix, prefix: cfg.Prefix,
@ -113,6 +115,10 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error {
return be.conn.ContainerCreate(ctx, be.container, h) return be.conn.ContainerCreate(ctx, be.container, h)
} }
func (be *beSwift) Connections() uint {
return be.connections
}
// Location returns this backend's location (the container name). // Location returns this backend's location (the container name).
func (be *beSwift) Location() string { func (be *beSwift) Location() string {
return be.container return be.container

View file

@ -20,6 +20,7 @@ type Backend struct {
RemoveFn func(ctx context.Context, h restic.Handle) error RemoveFn func(ctx context.Context, h restic.Handle) error
TestFn func(ctx context.Context, h restic.Handle) (bool, error) TestFn func(ctx context.Context, h restic.Handle) (bool, error)
DeleteFn func(ctx context.Context) error DeleteFn func(ctx context.Context) error
ConnectionsFn func() uint
LocationFn func() string LocationFn func() string
HasherFn func() hash.Hash HasherFn func() hash.Hash
} }
@ -39,6 +40,14 @@ func (m *Backend) Close() error {
return m.CloseFn() return m.CloseFn()
} }
func (m *Backend) Connections() uint {
if m.ConnectionsFn == nil {
return 2
}
return m.ConnectionsFn()
}
// Location returns a location string. // Location returns a location string.
func (m *Backend) Location() string { func (m *Backend) Location() string {
if m.LocationFn == nil { if m.LocationFn == nil {

View file

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress" "github.com/restic/restic/internal/ui/progress"
@ -23,6 +24,10 @@ const numRepackWorkers = 8
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
if repo == dstRepo && dstRepo.Backend().Connections() < 2 {
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
}
var keepMutex sync.Mutex var keepMutex sync.Mutex
wg, wgCtx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
@ -86,7 +91,11 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return nil return nil
} }
for i := 0; i < numRepackWorkers; i++ { connectionLimit := dstRepo.Backend().Connections() - 1
if connectionLimit > numRepackWorkers {
connectionLimit = numRepackWorkers
}
for i := 0; i < int(connectionLimit); i++ {
wg.Go(worker) wg.Go(worker)
} }

View file

@ -18,6 +18,9 @@ type Backend interface {
// repository. // repository.
Location() string Location() string
// Connections returns the maxmimum number of concurrent backend operations.
Connections() uint
// Hasher may return a hash function for calculating a content hash for the backend // Hasher may return a hash function for calculating a content hash for the backend
Hasher() hash.Hash Hasher() hash.Hash