Update github.com/kurin/blazer

Reduces cost-intensive list_files API calls.
This commit is contained in:
Alexander Neumann 2017-06-05 22:40:56 +02:00
parent a9a2798910
commit 29f8f8fe68
9 changed files with 222 additions and 88 deletions

2
vendor/manifest vendored
View file

@ -34,7 +34,7 @@
{
"importpath": "github.com/kurin/blazer",
"repository": "https://github.com/kurin/blazer",
"revision": "48de0a1e4d21fba201aff7fefdf3e5e7735b1439",
"revision": "d1b9d31c8641e46f2651fe564ee9ddb857c1ed29",
"branch": "master"
},
{

View file

@ -28,7 +28,6 @@
package b2
import (
"bytes"
"fmt"
"io"
"net/http"
@ -64,7 +63,10 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
}
type clientOptions struct {
transport http.RoundTripper
transport http.RoundTripper
failSomeUploads bool
expireTokens bool
capExceeded bool
}
// A ClientOption allows callers to adjust various per-client settings.
@ -78,6 +80,30 @@ func Transport(rt http.RoundTripper) ClientOption {
}
}
// FailSomeUploads requests intermittent upload failures from the B2 service.
// This is mostly useful for testing.
func FailSomeUploads() ClientOption {
return func(c *clientOptions) {
c.failSomeUploads = true
}
}
// ExpireSomeAuthTokens requests intermittent authentication failures from the
// B2 service.
func ExpireSomeAuthTokens() ClientOption {
return func(c *clientOptions) {
c.expireTokens = true
}
}
// ForceCapExceeded requests a cap limit from the B2 service. This causes all
// uploads to be treated as if they would exceed the configure B2 capacity.
func ForceCapExceeded() ClientOption {
return func(c *clientOptions) {
c.capExceeded = true
}
}
// Bucket is a reference to a B2 bucket.
type Bucket struct {
b beBucketInterface
@ -306,9 +332,11 @@ func (o *Object) Name() string {
// Attrs returns an object's attributes.
func (o *Object) Attrs(ctx context.Context) (*Attrs, error) {
if err := o.ensure(ctx); err != nil {
f, err := o.b.b.downloadFileByName(ctx, o.name, 0, 1)
if err != nil {
return nil, err
}
o.f = o.b.b.file(f.id())
fi, err := o.f.getFileInfo(ctx)
if err != nil {
return nil, err
@ -403,7 +431,7 @@ func (o *Object) NewRangeReader(ctx context.Context, offset, length int64) *Read
cancel: cancel,
o: o,
name: o.name,
chunks: make(map[int]*bytes.Buffer),
chunks: make(map[int]*rchunk),
length: length,
offset: offset,
}

View file

@ -196,8 +196,18 @@ func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d
}
func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
f := t.files[name]
end := int(offset + size)
if end >= len(f) {
end = len(f)
}
if int(offset) >= len(f) {
return nil, errNoMoreContent
}
return &testFileReader{
b: ioutil.NopCloser(bytes.NewBufferString(t.files[name][offset : offset+size])),
b: ioutil.NopCloser(bytes.NewBufferString(f[offset:end])),
s: end - int(offset),
n: name,
}, nil
}
@ -205,7 +215,8 @@ func (t *testBucket) hideFile(context.Context, string) (b2FileInterface, error)
func (t *testBucket) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) {
return "", nil
}
func (t *testBucket) baseURL() string { return "" }
func (t *testBucket) baseURL() string { return "" }
func (t *testBucket) file(id string) b2FileInterface { return nil }
type testURL struct {
files map[string]string
@ -310,12 +321,14 @@ func (t *testFile) deleteFileVersion(context.Context) error {
type testFileReader struct {
b io.ReadCloser
s int64
s int
n string
}
func (t *testFileReader) Read(p []byte) (int, error) { return t.b.Read(p) }
func (t *testFileReader) Close() error { return nil }
func (t *testFileReader) stats() (int, string, string, map[string]string) { return 0, "", "", nil }
func (t *testFileReader) stats() (int, string, string, map[string]string) { return t.s, "", "", nil }
func (t *testFileReader) id() string { return t.n }
type zReader struct{}
@ -569,6 +582,46 @@ func TestReadWrite(t *testing.T) {
}
}
func TestReadRangeReturnsRight(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
client := &Client{
backend: &beRoot{
b2i: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{},
},
},
}
bucket, err := client.NewBucket(ctx, bucketName, &BucketAttrs{Type: Private})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := bucket.Delete(ctx); err != nil {
t.Error(err)
}
}()
obj, _, err := writeFile(ctx, bucket, "file", 1e6+42, 1e8)
if err != nil {
t.Fatal(err)
}
r := obj.NewRangeReader(ctx, 200, 1400)
r.ChunkSize = 1000
i, err := io.Copy(ioutil.Discard, r)
if err != nil {
t.Error(err)
}
if i != 1400 {
t.Errorf("NewRangeReader(_, 200, 1400): want 1400, got %d", i)
}
}
func TestWriterReturnsError(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)

View file

@ -54,6 +54,7 @@ type beBucketInterface interface {
hideFile(context.Context, string) (beFileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
baseURL() string
file(string) beFileInterface
}
type beBucket struct {
@ -110,6 +111,7 @@ type beFileChunk struct {
type beFileReaderInterface interface {
io.ReadCloser
stats() (int, string, string, map[string]string)
id() string
}
type beFileReader struct {
@ -405,6 +407,13 @@ func (b *beBucket) baseURL() string {
return b.b2bucket.baseURL()
}
func (b *beBucket) file(id string) beFileInterface {
return &beFile{
b2file: b.b2bucket.file(id),
ri: b.ri,
}
}
func (b *beURL) uploadFile(ctx context.Context, r io.ReadSeeker, size int, name, ct, sha1 string, info map[string]string) (beFileInterface, error) {
var file beFileInterface
f := func() error {
@ -602,6 +611,8 @@ func (b *beFileReader) stats() (int, string, string, map[string]string) {
return b.b2fileReader.stats()
}
func (b *beFileReader) id() string { return b.b2fileReader.id() }
func (b *beFileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) {
return b.name, b.sha, b.size, b.ct, b.info, b.status, b.stamp
}

View file

@ -16,6 +16,7 @@ package b2
import (
"io"
"net/http"
"time"
"github.com/kurin/blazer/base"
@ -50,6 +51,7 @@ type b2BucketInterface interface {
hideFile(context.Context, string) (b2FileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
baseURL() string
file(string) b2FileInterface
}
type b2URLInterface interface {
@ -81,6 +83,7 @@ type b2FileChunkInterface interface {
type b2FileReaderInterface interface {
io.ReadCloser
stats() (int, string, string, map[string]string)
id() string
}
type b2FileInfoInterface interface {
@ -138,6 +141,15 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
if c.transport != nil {
aopts = append(aopts, base.Transport(c.transport))
}
if c.failSomeUploads {
aopts = append(aopts, base.FailSomeUploads())
}
if c.expireTokens {
aopts = append(aopts, base.ExpireSomeAuthTokens())
}
if c.capExceeded {
aopts = append(aopts, base.ForceCapExceeded())
}
nb, err := base.AuthorizeAccount(ctx, account, key, aopts...)
if err != nil {
return err
@ -303,6 +315,9 @@ func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, ne
func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
fr, err := b.b.DownloadFileByName(ctx, name, offset, size)
if err != nil {
if code, _ := base.Code(err); code == http.StatusRequestedRangeNotSatisfiable {
return nil, errNoMoreContent
}
return nil, err
}
return &b2FileReader{fr}, nil
@ -324,6 +339,8 @@ func (b *b2Bucket) baseURL() string {
return b.b.BaseURL()
}
func (b *b2Bucket) file(id string) b2FileInterface { return &b2File{b.b.File(id)} }
func (b *b2URL) uploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (b2FileInterface, error) {
file, err := b.b.UploadFile(ctx, r, size, name, contentType, sha1, info)
if err != nil {
@ -416,6 +433,8 @@ func (b *b2FileReader) stats() (int, string, string, map[string]string) {
return b.b.ContentLength, b.b.ContentType, b.b.SHA1, b.b.Info
}
func (b *b2FileReader) id() string { return b.b.ID }
func (b *b2FileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) {
return b.b.Name, b.b.SHA1, b.b.Size, b.b.ContentType, b.b.Info, b.b.Status, b.b.Timestamp
}

View file

@ -25,8 +25,6 @@ import (
"testing"
"time"
"github.com/kurin/blazer/base"
"golang.org/x/net/context"
)
@ -37,16 +35,6 @@ const (
errVar = "B2_TRANSIENT_ERRORS"
)
func init() {
fail := os.Getenv(errVar)
switch fail {
case "", "0", "false":
return
}
base.FailSomeUploads = true
base.ExpireSomeAuthTokens = true
}
func TestReadWriteLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
@ -144,6 +132,21 @@ func TestHideShowLive(t *testing.T) {
}
}
type cancelReader struct {
r io.Reader
n, l int
c func()
}
func (c *cancelReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.n += n
if c.n >= c.l {
c.c()
}
return n, err
}
func TestResumeWriter(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
@ -151,18 +154,11 @@ func TestResumeWriter(t *testing.T) {
w := bucket.Object("foo").NewWriter(ctx)
w.ChunkSize = 5e6
r := io.LimitReader(zReader{}, 15e6)
go func() {
// Cancel the context after the first chunk has been written.
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer cancel()
for range ticker.C {
if w.cidx > 1 {
return
}
}
}()
r := &cancelReader{
r: io.LimitReader(zReader{}, 15e6),
l: 6e6,
c: cancel,
}
if _, err := io.Copy(w, r); err != context.Canceled {
t.Fatalf("io.Copy: wanted canceled context, got: %v", err)
}
@ -392,6 +388,11 @@ func TestRangeReaderLive(t *testing.T) {
length: 2e6,
size: 1e6,
},
{
offset: 0,
length: 4e6,
size: 3e6,
},
}
for _, e := range table {
@ -418,12 +419,12 @@ func TestRangeReaderLive(t *testing.T) {
continue
}
if read != e.size {
t.Errorf("read %d bytes, wanted %d bytes", read, e.size)
t.Errorf("NewRangeReader(_, %d, %d): read %d bytes, wanted %d bytes", e.offset, e.length, read, e.size)
}
got := fmt.Sprintf("%x", hr.Sum(nil))
want := fmt.Sprintf("%x", hw.Sum(nil))
if got != want {
t.Errorf("bad hash, got %q, want %q", got, want)
t.Errorf("NewRangeReader(_, %d, %d): got %q, want %q", e.offset, e.length, got, want)
}
}
}
@ -661,7 +662,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
t.Skipf("B2_ACCOUNT_ID or B2_SECRET_KEY unset; skipping integration tests")
return nil, nil
}
client, err := NewClient(ctx, id, key)
client, err := NewClient(ctx, id, key, FailSomeUploads(), ExpireSomeAuthTokens())
if err != nil {
t.Fatal(err)
return nil, nil

View file

@ -16,6 +16,7 @@ package b2
import (
"bytes"
"errors"
"io"
"sync"
@ -24,6 +25,8 @@ import (
"golang.org/x/net/context"
)
var errNoMoreContent = errors.New("416: out of content")
// Reader reads files from B2.
type Reader struct {
// ConcurrentDownloads is the number of simultaneous downloads to pull from
@ -42,16 +45,15 @@ type Reader struct {
name string
offset int64 // the start of the file
length int64 // the length to read, or -1
size int64 // the end of the file, in absolute terms
csize int // chunk size
read int // amount read
chwid int // chunks written
chrid int // chunks read
chbuf chan *bytes.Buffer
chbuf chan *rchunk
init sync.Once
rmux sync.Mutex // guards rcond
rcond *sync.Cond
chunks map[int]*bytes.Buffer
chunks map[int]*rchunk
emux sync.RWMutex // guards err, believe it or not
err error
@ -60,6 +62,11 @@ type Reader struct {
smap map[int]*meteredReader
}
type rchunk struct {
bytes.Buffer
final bool
}
// Close frees resources associated with the download.
func (r *Reader) Close() error {
r.cancel()
@ -93,7 +100,7 @@ func (r *Reader) getErr() error {
func (r *Reader) thread() {
go func() {
for {
var buf *bytes.Buffer
var buf *rchunk
select {
case b, ok := <-r.chbuf:
if !ok {
@ -109,26 +116,31 @@ func (r *Reader) thread() {
r.rmux.Unlock()
offset := int64(chunkID*r.csize) + r.offset
size := int64(r.csize)
if offset >= r.size {
// Send an empty chunk. This is necessary to prevent a deadlock when
// this is the very first chunk.
if r.length > 0 {
if size > r.length {
buf.final = true
size = r.length
}
r.length -= size
}
redo:
fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size)
if err == errNoMoreContent {
// this read generated a 416 so we are entirely past the end of the object
buf.final = true
r.rmux.Lock()
r.chunks[chunkID] = buf
r.rmux.Unlock()
r.rcond.Broadcast()
return
}
if offset+size > r.size {
size = r.size - offset
}
redo:
fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size)
if err != nil {
r.setErr(err)
r.rcond.Broadcast()
return
}
mr := &meteredReader{r: &fakeSeeker{fr}, size: int(size)}
rsize, _, _, _ := fr.stats()
mr := &meteredReader{r: &fakeSeeker{fr}, size: int(rsize)}
r.smux.Lock()
r.smap[chunkID] = mr
r.smux.Unlock()
@ -136,9 +148,9 @@ func (r *Reader) thread() {
r.smux.Lock()
r.smap[chunkID] = nil
r.smux.Unlock()
if i < size || err == io.ErrUnexpectedEOF {
if i < int64(rsize) || err == io.ErrUnexpectedEOF {
// Probably the network connection was closed early. Retry.
blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, size)
blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, rsize)
buf.Reset()
goto redo
}
@ -155,8 +167,8 @@ func (r *Reader) thread() {
}()
}
func (r *Reader) curChunk() (*bytes.Buffer, error) {
ch := make(chan *bytes.Buffer)
func (r *Reader) curChunk() (*rchunk, error) {
ch := make(chan *rchunk)
go func() {
r.rmux.Lock()
defer r.rmux.Unlock()
@ -185,17 +197,6 @@ func (r *Reader) initFunc() {
r.smap = make(map[int]*meteredReader)
r.smux.Unlock()
r.o.b.c.addReader(r)
if err := r.o.ensure(r.ctx); err != nil {
r.setErr(err)
return
}
r.size = r.o.f.size()
if r.length >= 0 && r.offset+r.length < r.size {
r.size = r.offset + r.length
}
if r.offset > r.size {
r.offset = r.size
}
r.rcond = sync.NewCond(&r.rmux)
cr := r.ConcurrentDownloads
if cr < 1 {
@ -205,10 +206,10 @@ func (r *Reader) initFunc() {
r.ChunkSize = 1e7
}
r.csize = r.ChunkSize
r.chbuf = make(chan *bytes.Buffer, cr)
r.chbuf = make(chan *rchunk, cr)
for i := 0; i < cr; i++ {
r.thread()
r.chbuf <- &bytes.Buffer{}
r.chbuf <- &rchunk{}
}
}
@ -226,7 +227,7 @@ func (r *Reader) Read(p []byte) (int, error) {
n, err := chunk.Read(p)
r.read += n
if err == io.EOF {
if int64(r.read) >= r.size-r.offset {
if chunk.final {
close(r.chbuf)
r.setErrNoCancel(err)
return n, err

View file

@ -107,7 +107,7 @@ func (w *Writer) setErr(err error) {
w.emux.Lock()
defer w.emux.Unlock()
if w.err == nil {
blog.V(0).Infof("error writing %s: %v", w.name, err)
blog.V(1).Infof("error writing %s: %v", w.name, err)
w.err = err
w.cancel()
}
@ -134,15 +134,15 @@ func (w *Writer) completeChunk(id int) {
var gid int32
func (w *Writer) thread() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
id := atomic.AddInt32(&gid, 1)
fc, err := w.file.getUploadPartURL(w.ctx)
if err != nil {
w.setErr(err)
return
}
w.wg.Add(1)
defer w.wg.Done()
for {
chunk, ok := <-w.ready
if !ok {

View file

@ -208,7 +208,10 @@ func millitime(t int64) time.Time {
}
type b2Options struct {
transport http.RoundTripper
transport http.RoundTripper
failSomeUploads bool
expireTokens bool
capExceeded bool
}
func (o *b2Options) getTransport() http.RoundTripper {
@ -272,20 +275,6 @@ func (rb *requestBody) getBody() io.Reader {
return rb.body
}
var (
// FailSomeUploads causes B2 to return errors, randomly, to some RPCs. It is
// intended to be used for integration testing.
FailSomeUploads = false
// ExpireSomeAuthTokens causes B2 to expire auth tokens frequently, testing
// account reauthentication.
ExpireSomeAuthTokens = false
// ForceCapExceeded causes B2 to reject all uploads with capacity limit
// failures.
ForceCapExceeded = false
)
var reqID int64
func (o *b2Options) makeRequest(ctx context.Context, method, verb, url string, b2req, b2resp interface{}, headers map[string]string, body *requestBody) error {
@ -311,13 +300,13 @@ func (o *b2Options) makeRequest(ctx context.Context, method, verb, url string, b
}
req.Header.Set("X-Blazer-Request-ID", fmt.Sprintf("%d", atomic.AddInt64(&reqID, 1)))
req.Header.Set("X-Blazer-Method", method)
if FailSomeUploads {
if o.failSomeUploads {
req.Header.Add("X-Bz-Test-Mode", "fail_some_uploads")
}
if ExpireSomeAuthTokens {
if o.expireTokens {
req.Header.Add("X-Bz-Test-Mode", "expire_some_account_authorization_tokens")
}
if ForceCapExceeded {
if o.capExceeded {
req.Header.Add("X-Bz-Test-Mode", "force_cap_exceeded")
}
cancel := make(chan struct{})
@ -396,6 +385,30 @@ func Transport(rt http.RoundTripper) AuthOption {
}
}
// FailSomeUploads requests intermittent upload failures from the B2 service.
// This is mostly useful for testing.
func FailSomeUploads() AuthOption {
return func(o *b2Options) {
o.failSomeUploads = true
}
}
// ExpireSomeAuthTokens requests intermittent authentication failures from the
// B2 service.
func ExpireSomeAuthTokens() AuthOption {
return func(o *b2Options) {
o.expireTokens = true
}
}
// ForceCapExceeded requests a cap limit from the B2 service. This causes all
// uploads to be treated as if they would exceed the configure B2 capacity.
func ForceCapExceeded() AuthOption {
return func(o *b2Options) {
o.capExceeded = true
}
}
type LifecycleRule struct {
Prefix string
DaysNewUntilHidden int
@ -604,6 +617,12 @@ type File struct {
b2 *B2
}
// File returns a bare File struct, but with the appropriate id and b2
// interfaces.
func (b *Bucket) File(id string) *File {
return &File{id: id, b2: b.b2}
}
// UploadFile wraps b2_upload_file.
func (url *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) {
headers := map[string]string{
@ -910,6 +929,7 @@ type FileReader struct {
ContentLength int
ContentType string
SHA1 string
ID string
Info map[string]string
}
@ -971,6 +991,7 @@ func (b *Bucket) DownloadFileByName(ctx context.Context, name string, offset, si
return &FileReader{
ReadCloser: reply.resp.Body,
SHA1: reply.resp.Header.Get("X-Bz-Content-Sha1"),
ID: reply.resp.Header.Get("X-Bz-File-Id"),
ContentType: reply.resp.Header.Get("Content-Type"),
ContentLength: int(clen),
Info: info,