Update github.com/kurin/blazer to 0.3.0

This commit will reduce the number of HTTP requests per file uploaded
from two to one.
This commit is contained in:
Alexander Neumann 2018-02-20 21:01:21 +01:00
parent c99eabfb37
commit 296769355d
7 changed files with 119 additions and 44 deletions

4
Gopkg.lock generated
View file

@ -88,8 +88,8 @@
[[projects]] [[projects]]
name = "github.com/kurin/blazer" name = "github.com/kurin/blazer"
packages = ["b2","base","internal/b2types","internal/blog"] packages = ["b2","base","internal/b2types","internal/blog"]
revision = "5b348b2bdb078b06baa46ab7e12cdff12ee028ab" revision = "cd0304efa98725679cf68422cefa328d3d96f2f4"
version = "v0.2.2" version = "v0.3.0"
[[projects]] [[projects]]
name = "github.com/marstr/guid" name = "github.com/marstr/guid"

View file

@ -45,6 +45,7 @@ type Client struct {
slock sync.Mutex slock sync.Mutex
sWriters map[string]*Writer sWriters map[string]*Writer
sReaders map[string]*Reader sReaders map[string]*Reader
sMethods map[string]int
} }
// NewClient creates and returns a new Client with valid B2 service account // NewClient creates and returns a new Client with valid B2 service account
@ -54,7 +55,9 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
backend: &beRoot{ backend: &beRoot{
b2i: &b2Root{}, b2i: &b2Root{},
}, },
sMethods: make(map[string]int),
} }
opts = append(opts, client(c))
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil { if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
return nil, err return nil, err
} }
@ -62,6 +65,7 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
} }
type clientOptions struct { type clientOptions struct {
client *Client
transport http.RoundTripper transport http.RoundTripper
failSomeUploads bool failSomeUploads bool
expireTokens bool expireTokens bool
@ -115,13 +119,38 @@ func ForceCapExceeded() ClientOption {
} }
} }
func client(cl *Client) ClientOption {
return func(c *clientOptions) {
c.client = cl
}
}
type clientTransport struct {
client *Client
rt http.RoundTripper
}
func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) {
method := r.Header.Get("X-Blazer-Method")
if method != "" && ct.client != nil {
ct.client.slock.Lock()
ct.client.sMethods[method]++
ct.client.slock.Unlock()
}
t := ct.rt
if t == nil {
t = http.DefaultTransport
}
return t.RoundTrip(r)
}
// Bucket is a reference to a B2 bucket. // Bucket is a reference to a B2 bucket.
type Bucket struct { type Bucket struct {
b beBucketInterface b beBucketInterface
r beRootInterface r beRootInterface
c *Client c *Client
urlPool sync.Pool urlPool *urlPool
} }
type BucketType string type BucketType string
@ -189,6 +218,36 @@ func IsNotExist(err error) bool {
return berr.notFoundErr return berr.notFoundErr
} }
const uploadURLPoolSize = 100
type urlPool struct {
ch chan beURLInterface
}
func newURLPool() *urlPool {
return &urlPool{ch: make(chan beURLInterface, uploadURLPoolSize)}
}
func (p *urlPool) get() beURLInterface {
select {
case ue := <-p.ch:
// if the channel has an upload URL available, use that
return ue
default:
// otherwise return nil, a new upload URL needs to be generated
return nil
}
}
func (p *urlPool) put(u beURLInterface) {
select {
case p.ch <- u:
// put the URL back if possible
default:
// if the channel is full, throw it away
}
}
// Bucket returns a bucket if it exists. // Bucket returns a bucket if it exists.
func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) { func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
buckets, err := c.backend.listBuckets(ctx) buckets, err := c.backend.listBuckets(ctx)
@ -201,6 +260,7 @@ func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
b: bucket, b: bucket,
r: c.backend, r: c.backend,
c: c, c: c,
urlPool: newURLPool(),
}, nil }, nil
} }
} }
@ -224,6 +284,7 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
b: bucket, b: bucket,
r: c.backend, r: c.backend,
c: c, c: c,
urlPool: newURLPool(),
}, nil }, nil
} }
} }
@ -238,6 +299,7 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
b: b, b: b,
r: c.backend, r: c.backend,
c: c, c: c,
urlPool: newURLPool(),
}, err }, err
} }
@ -253,6 +315,7 @@ func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
b: b, b: b,
r: c.backend, r: c.backend,
c: c, c: c,
urlPool: newURLPool(),
}) })
} }
return buckets, nil return buckets, nil

View file

@ -138,9 +138,11 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
f(c) f(c)
} }
var aopts []base.AuthOption var aopts []base.AuthOption
ct := &clientTransport{client: c.client}
if c.transport != nil { if c.transport != nil {
aopts = append(aopts, base.Transport(c.transport)) ct.rt = c.transport
} }
aopts = append(aopts, base.Transport(ct))
if c.failSomeUploads { if c.failSomeUploads {
aopts = append(aopts, base.FailSomeUploads()) aopts = append(aopts, base.FailSomeUploads())
} }

View file

@ -17,7 +17,9 @@ package b2
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"crypto/sha1" "crypto/sha1"
"encoding/hex"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -735,21 +737,14 @@ func TestWriteEmpty(t *testing.T) {
type rtCounter struct { type rtCounter struct {
rt http.RoundTripper rt http.RoundTripper
trips int trips int
api string
sync.Mutex sync.Mutex
} }
func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) { func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) {
rt.Lock() rt.Lock()
defer rt.Unlock() defer rt.Unlock()
resp, err := rt.rt.RoundTrip(r)
if err != nil {
return resp, err
}
if rt.api == "" || r.Header.Get("X-Blazer-Method") == rt.api {
rt.trips++ rt.trips++
} return rt.rt.RoundTrip(r)
return resp, nil
} }
func TestAttrsNoRoundtrip(t *testing.T) { func TestAttrsNoRoundtrip(t *testing.T) {
@ -828,12 +823,6 @@ func TestAttrsNoRoundtrip(t *testing.T) {
}*/ }*/
func TestSmallUploadsFewRoundtrips(t *testing.T) { func TestSmallUploadsFewRoundtrips(t *testing.T) {
rt := &rtCounter{rt: defaultTransport, api: "b2_get_upload_url"}
defaultTransport = rt
defer func() {
defaultTransport = rt.rt
}()
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel() defer cancel()
@ -847,9 +836,11 @@ func TestSmallUploadsFewRoundtrips(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
if rt.trips > 3 { si := bucket.c.Status()
// Pool is not guaranteed to be valid, so 3 calls allows some slack. getURL := si.MethodCalls["b2_get_upload_url"]
t.Errorf("too many calls to b2_get_upload_url: got %d, want < 3", rt.trips) uploadFile := si.MethodCalls["b2_upload_file"]
if getURL >= uploadFile {
t.Errorf("too many calls to b2_get_upload_url")
} }
} }
@ -1001,6 +992,16 @@ func (cc *ccRC) Close() error {
return cc.ReadCloser.Close() return cc.ReadCloser.Close()
} }
var uniq string
func init() {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(err)
}
uniq = hex.EncodeToString(b)
}
func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) { func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
id := os.Getenv(apiID) id := os.Getenv(apiID)
key := os.Getenv(apiKey) key := os.Getenv(apiKey)
@ -1016,7 +1017,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
t.Fatal(err) t.Fatal(err)
return nil, nil return nil, nil
} }
bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil) bucket, err := client.NewBucket(ctx, fmt.Sprintf("%s-%s-%s", id, bucketName, uniq), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return nil, nil return nil, nil

View file

@ -20,6 +20,7 @@ import "fmt"
type StatusInfo struct { type StatusInfo struct {
Writers map[string]*WriterStatus Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus Readers map[string]*ReaderStatus
MethodCalls map[string]int
} }
// WriterStatus reports the status for each writer. // WriterStatus reports the status for each writer.
@ -44,6 +45,7 @@ func (c *Client) Status() *StatusInfo {
si := &StatusInfo{ si := &StatusInfo{
Writers: make(map[string]*WriterStatus), Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus), Readers: make(map[string]*ReaderStatus),
MethodCalls: make(map[string]int),
} }
for name, w := range c.sWriters { for name, w := range c.sWriters {
@ -54,6 +56,10 @@ func (c *Client) Status() *StatusInfo {
si.Readers[name] = r.status() si.Readers[name] = r.status()
} }
for name, n := range c.sMethods {
si.MethodCalls[name] = n
}
return si return si
} }

View file

@ -246,12 +246,12 @@ func (w *Writer) Write(p []byte) (int, error) {
} }
func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) { func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) {
u := w.o.b.urlPool.Get() u := w.o.b.urlPool.get()
if u == nil { if u == nil {
return w.o.b.b.getUploadURL(w.ctx) return w.o.b.b.getUploadURL(w.ctx)
} }
ue := u.(beURLInterface)
return ue, nil return u, nil
} }
func (w *Writer) simpleWriteFile() error { func (w *Writer) simpleWriteFile() error {
@ -261,7 +261,7 @@ func (w *Writer) simpleWriteFile() error {
} }
// This defer needs to be in a func() so that we put whatever the value of ue // This defer needs to be in a func() so that we put whatever the value of ue
// is at function exit. // is at function exit.
defer func() { w.o.b.urlPool.Put(ue) }() defer func() { w.o.b.urlPool.put(ue) }()
sha1 := w.w.Hash() sha1 := w.w.Hash()
ctype := w.contentType ctype := w.contentType
if ctype == "" { if ctype == "" {

View file

@ -42,7 +42,7 @@ import (
const ( const (
APIBase = "https://api.backblazeb2.com" APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.2.2" DefaultUserAgent = "blazer/0.3.0"
) )
type b2err struct { type b2err struct {
@ -903,6 +903,9 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
} }
b2resp := &b2types.FinishLargeFileResponse{} b2resp := &b2types.FinishLargeFileResponse{}
for k, v := range l.hashes { for k, v := range l.hashes {
if len(b2req.Hashes) < k {
return nil, fmt.Errorf("b2_finish_large_file: invalid index %d", k)
}
b2req.Hashes[k-1] = v b2req.Hashes[k-1] = v
} }
headers := map[string]string{ headers := map[string]string{