azure: Support uploading large files

Closes #1822
This commit is contained in:
Alexander Neumann 2018-05-31 21:26:28 +02:00
parent 0fcd9d6926
commit 465700595c
2 changed files with 158 additions and 4 deletions

View file

@ -2,6 +2,7 @@ package azure
import (
"context"
"encoding/base64"
"io"
"io/ioutil"
"net/http"
@ -64,13 +65,13 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
}
// Open opens the Azure backend at specified container.
func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
return open(cfg, rt)
}
// Create opens the Azure backend at specified container and creates the container if
// it does not exist yet.
func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
func Create(cfg Config, rt http.RoundTripper) (*Backend, error) {
be, err := open(cfg, rt)
if err != nil {
@ -129,8 +130,18 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
debug.Log("InsertObject(%v, %v)", be.container.Name, objName)
var err error
if rd.Length() < 256*1024*1024 {
// wrap the reader so that net/http client cannot close the reader
err := be.container.GetBlobReference(objName).CreateBlockBlobFromReader(ioutil.NopCloser(rd), nil)
dataReader := ioutil.NopCloser(rd)
// if it's smaller than 256miB, then just create the file directly from the reader
err = be.container.GetBlobReference(objName).CreateBlockBlobFromReader(dataReader, nil)
} else {
// otherwise use the more complicated method
err = be.saveLarge(ctx, objName, rd)
}
be.sem.ReleaseToken()
debug.Log("%v, err %#v", objName, err)
@ -138,6 +149,55 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
return errors.Wrap(err, "CreateBlockBlobFromReader")
}
func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.RewindReader) error {
// create the file on the server
file := be.container.GetBlobReference(objName)
err := file.CreateBlockBlob(nil)
if err != nil {
return errors.Wrap(err, "CreateBlockBlob")
}
// read the data, in 100 MiB chunks
buf := make([]byte, 100*1024*1024)
var blocks []storage.Block
for {
n, err := io.ReadFull(rd, buf)
if err == io.ErrUnexpectedEOF {
err = nil
}
if err == io.EOF {
// end of file reached, no bytes have been read at all
break
}
if err != nil {
return errors.Wrap(err, "ReadFull")
}
buf = buf[:n]
// upload it as a new "block", use the base64 hash for the ID
h := restic.Hash(buf)
id := base64.StdEncoding.EncodeToString(h[:])
debug.Log("PutBlock %v with %d bytes", id, len(buf))
err = file.PutBlock(id, buf, nil)
if err != nil {
return errors.Wrap(err, "PutBlock")
}
blocks = append(blocks, storage.Block{
ID: id,
Status: "Uncommitted",
})
}
debug.Log("uploaded %d parts: %v", len(blocks), blocks)
err = file.PutBlockList(blocks, nil)
debug.Log("PutBlockList returned %v", err)
return errors.Wrap(err, "PutBlockList")
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser

View file

@ -1,8 +1,10 @@
package azure_test
import (
"bytes"
"context"
"fmt"
"io"
"os"
"testing"
"time"
@ -122,3 +124,95 @@ func BenchmarkBackendAzure(t *testing.B) {
t.Logf("run tests")
newAzureTestSuite(t).RunBenchmarks(t)
}
func TestUploadLargeFile(t *testing.T) {
if os.Getenv("RESTIC_AZURE_TEST_LARGE_UPLOAD") == "" {
t.Skip("set RESTIC_AZURE_TEST_LARGE_UPLOAD=1 to test large uploads")
return
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if os.Getenv("RESTIC_TEST_AZURE_REPOSITORY") == "" {
t.Skipf("environment variables not available")
return
}
azcfg, err := azure.ParseConfig(os.Getenv("RESTIC_TEST_AZURE_REPOSITORY"))
if err != nil {
if err != nil {
t.Fatal(err)
}
}
cfg := azcfg.(azure.Config)
cfg.AccountName = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_NAME")
cfg.AccountKey = os.Getenv("RESTIC_TEST_AZURE_ACCOUNT_KEY")
cfg.Prefix = fmt.Sprintf("test-upload-large-%d", time.Now().UnixNano())
tr, err := backend.Transport(backend.TransportOptions{})
if err != nil {
t.Fatal(err)
}
be, err := azure.Create(cfg, tr)
if err != nil {
if err != nil {
t.Fatal(err)
}
}
defer func() {
err := be.Delete(ctx)
if err != nil {
t.Fatal(err)
}
}()
data := rtest.Random(23, 300*1024*1024)
id := restic.Hash(data)
h := restic.Handle{Name: id.String(), Type: restic.DataFile}
t.Logf("hash of %d bytes: %v", len(data), id)
err = be.Save(ctx, h, restic.NewByteReader(data))
if err != nil {
t.Fatal(err)
}
defer func() {
err := be.Remove(ctx, h)
if err != nil {
t.Fatal(err)
}
}()
var tests = []struct {
offset, length int
}{
{0, len(data)},
{23, 1024},
{23 + 100*1024, 500},
{888 + 200*1024, 89999},
{888 + 100*1024*1024, 120 * 1024 * 1024},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
want := data[test.offset : test.offset+test.length]
buf := make([]byte, test.length)
err = be.Load(ctx, h, test.length, int64(test.offset), func(rd io.Reader) error {
_, err = io.ReadFull(rd, buf)
return err
})
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, want) {
t.Fatalf("wrong bytes returned")
}
})
}
}