Parallelize blob upload/download for restic copy

Currently restic copy will copy each blob from every snapshot serially,
which has performance implications on high-latency backends such as b2.

This commit introduces 8x parallelism for blob downloads/uploads which
can improve restic copy operations up to 8x for repositories with many
small blobs on b2.

This commit also addresses the TODO comment in the copyTree function.

Related work:

A more thorough improvement of the restic copy performance can be found
in PR #3513
This commit is contained in:
Charlotte 🦝 Delenk 2021-12-14 10:33:24 +01:00 committed by Charlotte 🦝 Delenk
parent 882d58abce
commit e2bb384a60
No known key found for this signature in database
GPG key ID: 015E3768A70AFBC5
2 changed files with 42 additions and 18 deletions

View file

@ -0,0 +1,9 @@
Enhancement: Improve restic copy performance by parallelizing IO
Restic copy previously only used a single thread for copying blobs between
repositories, which resulted in limited performance when copying small blobs
to/from a high latency backend (i.e. any remote backend, especially b2).
Copying will now use 8 parallel threads to increase the throughput of the copy
operation.
https://github.com/restic/restic/pull/3593

View file

@ -174,9 +174,12 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {
return true
}
const numCopyWorkers = 8
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
visitedTrees restic.IDSet, rootTreeID restic.ID) error {
idChan := make(chan restic.ID)
wg, ctx := errgroup.WithContext(ctx)
treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool {
@ -186,9 +189,9 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
}, nil)
wg.Go(func() error {
defer close(idChan)
// reused buffer
var buf []byte
for tree := range treeStream {
if tree.Error != nil {
return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error)
@ -209,32 +212,44 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
}
}
// TODO: parallelize blob down/upload
for _, entry := range tree.Nodes {
// Recursion into directories is handled by StreamTrees
// Copy the blobs for this file.
for _, blobID := range entry.Content {
// Do we already have this data blob?
if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) {
continue
}
debug.Log("Copying blob %s\n", blobID.Str())
var err error
buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf)
if err != nil {
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
}
_, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false)
if err != nil {
return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err)
select {
case idChan <- blobID:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
return nil
})
for i := 0; i < numCopyWorkers; i++ {
wg.Go(func() error {
// reused buffer
var buf []byte
for blobID := range idChan {
// Do we already have this data blob?
if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) {
continue
}
debug.Log("Copying blob %s\n", blobID.Str())
var err error
buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf)
if err != nil {
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
}
_, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false)
if err != nil {
return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err)
}
}
return nil
})
}
return wg.Wait()
}