forked from TrueCloudLab/restic
61cb1cc6f8
This includes github.com/kurin/blazer 0.2.0, which resolves #1291
522 lines
12 KiB
Go
522 lines
12 KiB
Go
// Copyright 2016, Google
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package b2
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/kurin/blazer/internal/blog"
|
|
)
|
|
|
|
// Writer writes data into Backblaze. It automatically switches to the large
|
|
// file API if the file exceeds ChunkSize bytes. Due to that and other
|
|
// Backblaze API details, there is a large buffer.
|
|
//
|
|
// Changes to public Writer attributes must be made before the first call to
|
|
// Write.
|
|
type Writer struct {
|
|
// ConcurrentUploads is number of different threads sending data concurrently
|
|
// to Backblaze for large files. This can increase performance greatly, as
|
|
// each thread will hit a different endpoint. However, there is a ChunkSize
|
|
// buffer for each thread. Values less than 1 are equivalent to 1.
|
|
ConcurrentUploads int
|
|
|
|
// Resume an upload. If true, and the upload is a large file, and a file of
|
|
// the same name was started but not finished, then assume that we are
|
|
// resuming that file, and don't upload duplicate chunks.
|
|
Resume bool
|
|
|
|
// ChunkSize is the size, in bytes, of each individual part, when writing
|
|
// large files, and also when determining whether to upload a file normally
|
|
// or when to split it into parts. The default is 100M (1e8) The minimum is
|
|
// 5M (5e6); values less than this are not an error, but will fail. The
|
|
// maximum is 5GB (5e9).
|
|
ChunkSize int
|
|
|
|
// UseFileBuffer controls whether to use an in-memory buffer (the default) or
|
|
// scratch space on the file system. If this is true, b2 will save chunks in
|
|
// FileBufferDir.
|
|
UseFileBuffer bool
|
|
|
|
// FileBufferDir specifies the directory where scratch files are kept. If
|
|
// blank, os.TempDir() is used.
|
|
FileBufferDir string
|
|
|
|
contentType string
|
|
info map[string]string
|
|
|
|
csize int
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
ready chan chunk
|
|
wg sync.WaitGroup
|
|
start sync.Once
|
|
once sync.Once
|
|
done sync.Once
|
|
file beLargeFileInterface
|
|
seen map[int]string
|
|
everStarted bool
|
|
newBuffer func() (writeBuffer, error)
|
|
|
|
o *Object
|
|
name string
|
|
|
|
cidx int
|
|
w writeBuffer
|
|
|
|
emux sync.RWMutex
|
|
err error
|
|
|
|
smux sync.RWMutex
|
|
smap map[int]*meteredReader
|
|
}
|
|
|
|
type chunk struct {
|
|
id int
|
|
buf writeBuffer
|
|
}
|
|
|
|
func (w *Writer) setErr(err error) {
|
|
if err == nil || err == io.EOF {
|
|
return
|
|
}
|
|
w.emux.Lock()
|
|
defer w.emux.Unlock()
|
|
if w.err == nil {
|
|
blog.V(1).Infof("error writing %s: %v", w.name, err)
|
|
w.err = err
|
|
w.cancel()
|
|
}
|
|
}
|
|
|
|
func (w *Writer) getErr() error {
|
|
w.emux.RLock()
|
|
defer w.emux.RUnlock()
|
|
return w.err
|
|
}
|
|
|
|
func (w *Writer) registerChunk(id int, r *meteredReader) {
|
|
w.smux.Lock()
|
|
w.smap[id] = r
|
|
w.smux.Unlock()
|
|
}
|
|
|
|
func (w *Writer) completeChunk(id int) {
|
|
w.smux.Lock()
|
|
w.smap[id] = nil
|
|
w.smux.Unlock()
|
|
}
|
|
|
|
var gid int32
|
|
|
|
func (w *Writer) thread() {
|
|
w.wg.Add(1)
|
|
go func() {
|
|
defer w.wg.Done()
|
|
id := atomic.AddInt32(&gid, 1)
|
|
fc, err := w.file.getUploadPartURL(w.ctx)
|
|
if err != nil {
|
|
w.setErr(err)
|
|
return
|
|
}
|
|
for {
|
|
chunk, ok := <-w.ready
|
|
if !ok {
|
|
return
|
|
}
|
|
if sha, ok := w.seen[chunk.id]; ok {
|
|
if sha != chunk.buf.Hash() {
|
|
w.setErr(errors.New("resumable upload was requested, but chunks don't match!"))
|
|
return
|
|
}
|
|
chunk.buf.Close()
|
|
w.completeChunk(chunk.id)
|
|
blog.V(2).Infof("skipping chunk %d", chunk.id)
|
|
continue
|
|
}
|
|
blog.V(2).Infof("thread %d handling chunk %d", id, chunk.id)
|
|
r, err := chunk.buf.Reader()
|
|
if err != nil {
|
|
w.setErr(err)
|
|
return
|
|
}
|
|
mr := &meteredReader{r: r, size: chunk.buf.Len()}
|
|
w.registerChunk(chunk.id, mr)
|
|
sleep := time.Millisecond * 15
|
|
redo:
|
|
n, err := fc.uploadPart(w.ctx, mr, chunk.buf.Hash(), chunk.buf.Len(), chunk.id)
|
|
if n != chunk.buf.Len() || err != nil {
|
|
if w.o.b.r.reupload(err) {
|
|
time.Sleep(sleep)
|
|
sleep *= 2
|
|
if sleep > time.Second*15 {
|
|
sleep = time.Second * 15
|
|
}
|
|
blog.V(1).Infof("b2 writer: wrote %d of %d: error: %v; retrying", n, chunk.buf.Len(), err)
|
|
f, err := w.file.getUploadPartURL(w.ctx)
|
|
if err != nil {
|
|
w.setErr(err)
|
|
w.completeChunk(chunk.id)
|
|
chunk.buf.Close() // TODO: log error
|
|
return
|
|
}
|
|
fc = f
|
|
goto redo
|
|
}
|
|
w.setErr(err)
|
|
w.completeChunk(chunk.id)
|
|
chunk.buf.Close() // TODO: log error
|
|
return
|
|
}
|
|
w.completeChunk(chunk.id)
|
|
chunk.buf.Close() // TODO: log error
|
|
blog.V(2).Infof("chunk %d handled", chunk.id)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (w *Writer) init() {
|
|
w.start.Do(func() {
|
|
w.everStarted = true
|
|
w.smux.Lock()
|
|
w.smap = make(map[int]*meteredReader)
|
|
w.smux.Unlock()
|
|
w.o.b.c.addWriter(w)
|
|
w.csize = w.ChunkSize
|
|
if w.csize == 0 {
|
|
w.csize = 1e8
|
|
}
|
|
if w.newBuffer == nil {
|
|
w.newBuffer = func() (writeBuffer, error) { return newMemoryBuffer(), nil }
|
|
if w.UseFileBuffer {
|
|
w.newBuffer = func() (writeBuffer, error) { return newFileBuffer(w.FileBufferDir) }
|
|
}
|
|
}
|
|
v, err := w.newBuffer()
|
|
if err != nil {
|
|
w.setErr(err)
|
|
return
|
|
}
|
|
w.w = v
|
|
})
|
|
}
|
|
|
|
// Write satisfies the io.Writer interface.
|
|
func (w *Writer) Write(p []byte) (int, error) {
|
|
w.init()
|
|
if err := w.getErr(); err != nil {
|
|
return 0, err
|
|
}
|
|
left := w.csize - w.w.Len()
|
|
if len(p) < left {
|
|
return w.w.Write(p)
|
|
}
|
|
i, err := w.w.Write(p[:left])
|
|
if err != nil {
|
|
w.setErr(err)
|
|
return i, err
|
|
}
|
|
if err := w.sendChunk(); err != nil {
|
|
w.setErr(err)
|
|
return i, w.getErr()
|
|
}
|
|
k, err := w.Write(p[left:])
|
|
if err != nil {
|
|
w.setErr(err)
|
|
}
|
|
return i + k, err
|
|
}
|
|
|
|
func (w *Writer) simpleWriteFile() error {
|
|
ue, err := w.o.b.b.getUploadURL(w.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sha1 := w.w.Hash()
|
|
ctype := w.contentType
|
|
if ctype == "" {
|
|
ctype = "application/octet-stream"
|
|
}
|
|
r, err := w.w.Reader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mr := &meteredReader{r: r, size: w.w.Len()}
|
|
w.registerChunk(1, mr)
|
|
defer w.completeChunk(1)
|
|
redo:
|
|
f, err := ue.uploadFile(w.ctx, mr, int(w.w.Len()), w.name, ctype, sha1, w.info)
|
|
if err != nil {
|
|
if w.o.b.r.reupload(err) {
|
|
blog.V(2).Infof("b2 writer: %v; retrying", err)
|
|
u, err := w.o.b.b.getUploadURL(w.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ue = u
|
|
goto redo
|
|
}
|
|
return err
|
|
}
|
|
w.o.f = f
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) getLargeFile() (beLargeFileInterface, error) {
|
|
if !w.Resume {
|
|
ctype := w.contentType
|
|
if ctype == "" {
|
|
ctype = "application/octet-stream"
|
|
}
|
|
return w.o.b.b.startLargeFile(w.ctx, w.name, ctype, w.info)
|
|
}
|
|
next := 1
|
|
seen := make(map[int]string)
|
|
var size int64
|
|
var fi beFileInterface
|
|
for {
|
|
cur := &Cursor{name: w.name}
|
|
objs, _, err := w.o.b.ListObjects(w.ctx, 1, cur)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(objs) < 1 || objs[0].name != w.name {
|
|
w.Resume = false
|
|
return w.getLargeFile()
|
|
}
|
|
fi = objs[0].f
|
|
parts, n, err := fi.listParts(w.ctx, next, 100)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
next = n
|
|
for _, p := range parts {
|
|
seen[p.number()] = p.sha1()
|
|
size += p.size()
|
|
}
|
|
if len(parts) == 0 {
|
|
break
|
|
}
|
|
if next == 0 {
|
|
break
|
|
}
|
|
}
|
|
w.seen = make(map[int]string) // copy the map
|
|
for id, sha := range seen {
|
|
w.seen[id] = sha
|
|
}
|
|
return fi.compileParts(size, seen), nil
|
|
}
|
|
|
|
func (w *Writer) sendChunk() error {
|
|
var err error
|
|
w.once.Do(func() {
|
|
lf, e := w.getLargeFile()
|
|
if e != nil {
|
|
err = e
|
|
return
|
|
}
|
|
w.file = lf
|
|
w.ready = make(chan chunk)
|
|
if w.ConcurrentUploads < 1 {
|
|
w.ConcurrentUploads = 1
|
|
}
|
|
for i := 0; i < w.ConcurrentUploads; i++ {
|
|
w.thread()
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case w.ready <- chunk{
|
|
id: w.cidx + 1,
|
|
buf: w.w,
|
|
}:
|
|
case <-w.ctx.Done():
|
|
return w.ctx.Err()
|
|
}
|
|
w.cidx++
|
|
v, err := w.newBuffer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.w = v
|
|
return nil
|
|
}
|
|
|
|
// ReadFrom reads all of r into w, returning the first error or no error if r
|
|
// returns io.EOF. If r is also an io.Seeker, ReadFrom will stream r directly
|
|
// over the wire instead of buffering it locally. This reduces memory usage.
|
|
//
|
|
// Do not issue multiple calls to ReadFrom, or mix ReadFrom and Write. If you
|
|
// have multiple readers you want to concatenate into the same B2 object, use
|
|
// an io.MultiReader.
|
|
//
|
|
// Note that io.Copy will automatically choose to use ReadFrom.
|
|
//
|
|
// ReadFrom currently doesn't handle w.Resume; if w.Resume is true, ReadFrom
|
|
// will act as if r is not an io.Seeker.
|
|
func (w *Writer) ReadFrom(r io.Reader) (int64, error) {
|
|
rs, ok := r.(io.ReadSeeker)
|
|
if !ok || w.Resume {
|
|
return copyContext(w.ctx, w, r)
|
|
}
|
|
blog.V(2).Info("streaming without buffer")
|
|
size, err := rs.Seek(0, io.SeekEnd)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
var ra io.ReaderAt
|
|
if rat, ok := r.(io.ReaderAt); ok {
|
|
ra = rat
|
|
} else {
|
|
ra = enReaderAt(rs)
|
|
}
|
|
var offset int64
|
|
var wrote int64
|
|
w.newBuffer = func() (writeBuffer, error) {
|
|
left := size - offset
|
|
if left <= 0 {
|
|
// We're done sending real chunks; send empty chunks from now on so that
|
|
// Close() works.
|
|
w.newBuffer = func() (writeBuffer, error) { return newMemoryBuffer(), nil }
|
|
w.w = newMemoryBuffer()
|
|
return nil, io.EOF
|
|
}
|
|
csize := int64(w.csize)
|
|
if left < csize {
|
|
csize = left
|
|
}
|
|
nb := newNonBuffer(ra, offset, csize)
|
|
wrote += csize // TODO: this is kind of a total lie
|
|
offset += csize
|
|
return nb, nil
|
|
}
|
|
w.init()
|
|
if size < int64(w.csize) {
|
|
// the magic happens on w.Close()
|
|
return size, nil
|
|
}
|
|
for {
|
|
if err := w.sendChunk(); err != nil {
|
|
if err != io.EOF {
|
|
return wrote, err
|
|
}
|
|
return wrote, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close satisfies the io.Closer interface. It is critical to check the return
|
|
// value of Close for all writers.
|
|
func (w *Writer) Close() error {
|
|
w.done.Do(func() {
|
|
if !w.everStarted {
|
|
return
|
|
}
|
|
defer w.o.b.c.removeWriter(w)
|
|
defer func() {
|
|
if err := w.w.Close(); err != nil {
|
|
// this is non-fatal, but alarming
|
|
blog.V(1).Infof("close %s: %v", w.name, err)
|
|
}
|
|
}()
|
|
if w.cidx == 0 {
|
|
w.setErr(w.simpleWriteFile())
|
|
return
|
|
}
|
|
if w.w.Len() > 0 {
|
|
if err := w.sendChunk(); err != nil {
|
|
w.setErr(err)
|
|
return
|
|
}
|
|
}
|
|
close(w.ready)
|
|
w.wg.Wait()
|
|
f, err := w.file.finishLargeFile(w.ctx)
|
|
if err != nil {
|
|
w.setErr(err)
|
|
return
|
|
}
|
|
w.o.f = f
|
|
})
|
|
return w.getErr()
|
|
}
|
|
|
|
// WithAttrs sets the writable attributes of the resulting file to given
|
|
// values. WithAttrs must be called before the first call to Write.
|
|
func (w *Writer) WithAttrs(attrs *Attrs) *Writer {
|
|
w.contentType = attrs.ContentType
|
|
w.info = make(map[string]string)
|
|
for k, v := range attrs.Info {
|
|
w.info[k] = v
|
|
}
|
|
if len(w.info) < 10 && !attrs.LastModified.IsZero() {
|
|
w.info["src_last_modified_millis"] = fmt.Sprintf("%d", attrs.LastModified.UnixNano()/1e6)
|
|
}
|
|
return w
|
|
}
|
|
|
|
func (w *Writer) status() *WriterStatus {
|
|
w.smux.RLock()
|
|
defer w.smux.RUnlock()
|
|
|
|
ws := &WriterStatus{
|
|
Progress: make([]float64, len(w.smap)),
|
|
}
|
|
|
|
for i := 1; i <= len(w.smap); i++ {
|
|
ws.Progress[i-1] = w.smap[i].done()
|
|
}
|
|
|
|
return ws
|
|
}
|
|
|
|
type meteredReader struct {
|
|
read int64
|
|
size int
|
|
r readResetter
|
|
mux sync.Mutex
|
|
}
|
|
|
|
func (mr *meteredReader) Read(p []byte) (int, error) {
|
|
mr.mux.Lock()
|
|
defer mr.mux.Unlock()
|
|
n, err := mr.r.Read(p)
|
|
mr.read += int64(n)
|
|
return n, err
|
|
}
|
|
|
|
func (mr *meteredReader) Reset() error {
|
|
mr.mux.Lock()
|
|
defer mr.mux.Unlock()
|
|
mr.read = 0
|
|
return mr.r.Reset()
|
|
}
|
|
|
|
func (mr *meteredReader) done() float64 {
|
|
if mr == nil {
|
|
return 1
|
|
}
|
|
read := float64(atomic.LoadInt64(&mr.read))
|
|
return read / float64(mr.size)
|
|
}
|