Merge pull request #1827 from restic/azure-large-files
azure: Support uploading large files
This commit is contained in:
commit
d49ca42771
3 changed files with 167 additions and 4 deletions
9
changelog/unreleased/issue-1822
Normal file
9
changelog/unreleased/issue-1822
Normal file
|
@ -0,0 +1,9 @@
|
|||
Bugfix: Allow uploading large files to MS Azure
|
||||
|
||||
Sometimes, restic creates files to be uploaded to the repository which are
|
||||
quite large, e.g. when saving directories with many entries or very large
|
||||
files. The MS Azure API does not allow uploading files larger that 256MiB
|
||||
directly, rather restic needs to upload them in blocks of 100MiB. This is now
|
||||
implemented.
|
||||
|
||||
https://github.com/restic/restic/issues/1822
|
|
@ -2,6 +2,7 @@ package azure
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
@ -64,13 +65,13 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|||
}
|
||||
|
||||
// Open opens the Azure backend at specified container.
|
||||
func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
||||
func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
return open(cfg, rt)
|
||||
}
|
||||
|
||||
// Create opens the Azure backend at specified container and creates the container if
|
||||
// it does not exist yet.
|
||||
func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
||||
func Create(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
be, err := open(cfg, rt)
|
||||
|
||||
if err != nil {
|
||||
|
@ -129,8 +130,18 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
|
||||
debug.Log("InsertObject(%v, %v)", be.container.Name, objName)
|
||||
|
||||
// wrap the reader so that net/http client cannot close the reader
|
||||
err := be.container.GetBlobReference(objName).CreateBlockBlobFromReader(ioutil.NopCloser(rd), nil)
|
||||
var err error
|
||||
if rd.Length() < 256*1024*1024 {
|
||||
// wrap the reader so that net/http client cannot close the reader
|
||||
dataReader := ioutil.NopCloser(rd)
|
||||
|
||||
// if it's smaller than 256miB, then just create the file directly from the reader
|
||||
err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil)
|
||||
} else {
|
||||
// otherwise use the more complicated method
|
||||
err = be.saveLarge(ctx, objName, rd)
|
||||
|
||||
}
|
||||
|
||||
be.sem.ReleaseToken()
|
||||
debug.Log("%v, err %#v", objName, err)
|
||||
|
@ -138,6 +149,55 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
return errors.Wrap(err, "CreateBlockBlobFromReader")
|
||||
}
|
||||
|
||||
func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error {
|
||||
// create the file on the server
|
||||
file := be.container.GetBlobReference(objName)
|
||||
err := file.CreateBlockBlob(nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "CreateBlockBlob")
|
||||
}
|
||||
|
||||
// read the data, in 100 MiB chunks
|
||||
buf := make([]byte, 100*1024*1024)
|
||||
var blocks []storage.Block
|
||||
|
||||
for {
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = nil
|
||||
}
|
||||
if err == io.EOF {
|
||||
// end of file reached, no bytes have been read at all
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ReadFull")
|
||||
}
|
||||
|
||||
buf = buf[:n]
|
||||
|
||||
// upload it as a new "block", use the base64 hash for the ID
|
||||
h := restic.Hash(buf)
|
||||
id := base64.StdEncoding.EncodeToString(h[:])
|
||||
debug.Log("PutBlock %v with %d bytes", id, len(buf))
|
||||
err = file.PutBlock(id, buf, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "PutBlock")
|
||||
}
|
||||
|
||||
blocks = append(blocks, storage.Block{
|
||||
ID: id,
|
||||
Status: "Uncommitted",
|
||||
})
|
||||
}
|
||||
|
||||
debug.Log("uploaded %d parts: %v", len(blocks), blocks)
|
||||
err = file.PutBlockList(blocks, nil)
|
||||
debug.Log("PutBlockList returned %v", err)
|
||||
return errors.Wrap(err, "PutBlockList")
|
||||
}
|
||||
|
||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
||||
type wrapReader struct {
|
||||
io.ReadCloser
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package azure_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -122,3 +124,95 @@ func BenchmarkBackendAzure(t *testing.B) {
|
|||
t.Logf("run tests")
|
||||
newAzureTestSuite(t).RunBenchmarks(t)
|
||||
}
|
||||
|
||||
func TestUploadLargeFile(t *testing.T) {
|
||||
if os.Getenv("RESTIC_AZURE_TEST_LARGE_UPLOAD") == "" {
|
||||
t.Skip("set RESTIC_AZURE_TEST_LARGE_UPLOAD=1 to test large uploads")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
|
||||
if os.Getenv("RESTIC_TEST_AZURE_REPOSITORY") == "" {
|
||||
t.Skipf("environment variables not available")
|
||||
return
|
||||
}
|
||||
|
||||
azcfg, err := azure.ParseConfig(os.Getenv("RESTIC_TEST_AZURE_REPOSITORY"))
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
cfg := azcfg.(azure.Config)
|
||||
cfg.AccountName = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_NAME")
|
||||
cfg.AccountKey = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_KEY")
|
||||
cfg.Prefix = fmt.Sprintf("test-upload-large-%d", time.Now().UnixNano())
|
||||
|
||||
tr, err := backend.Transport(backend.TransportOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
be, err := azure.Create(cfg, tr)
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := be.Delete(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
data := rtest.Random(23, 300*1024*1024)
|
||||
id := restic.Hash(data)
|
||||
h := restic.Handle{Name: id.String(), Type: restic.DataFile}
|
||||
|
||||
t.Logf("hash of %d bytes: %v", len(data), id)
|
||||
|
||||
err = be.Save(ctx, h, restic.NewByteReader(data))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
err := be.Remove(ctx, h)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
var tests = []struct {
|
||||
offset, length int
|
||||
}{
|
||||
{0, len(data)},
|
||||
{23, 1024},
|
||||
{23 + 100*1024, 500},
|
||||
{888 + 200*1024, 89999},
|
||||
{888 + 100*1024*1024, 120 * 1024 * 1024},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
want := data[test.offset : test.offset+test.length]
|
||||
|
||||
buf := make([]byte, test.length)
|
||||
err = be.Load(ctx, h, test.length, int64(test.offset), func(rd io.Reader) error {
|
||||
_, err = io.ReadFull(rd, buf)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, want) {
|
||||
t.Fatalf("wrong bytes returned")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue