Merge pull request #1901 from restic/update-blazer

Update github.com/kurin/blazer
This commit is contained in:
Alexander Neumann 2018-07-22 20:59:52 +02:00
commit 4fc00d4120
26 changed files with 4664 additions and 336 deletions

181
Gopkg.lock generated
View file

@ -4,11 +4,7 @@
[[projects]]
branch = "master"
name = "bazil.org/fuse"
packages = [
".",
"fs",
"fuseutil"
]
packages = [".","fs","fuseutil"]
revision = "371fbbdaa8987b715bdd21d6adc4c9b20155f748"
[[projects]]
@ -19,21 +15,13 @@
[[projects]]
name = "github.com/Azure/azure-sdk-for-go"
packages = [
"storage",
"version"
]
packages = ["storage","version"]
revision = "56332fec5b308fbb6615fa1af6117394cdba186d"
version = "v15.0.0"
[[projects]]
name = "github.com/Azure/go-autorest"
packages = [
"autorest",
"autorest/adal",
"autorest/azure",
"autorest/date"
]
packages = ["autorest","autorest/adal","autorest/azure","autorest/date"]
revision = "ed4b7f5bf1ec0c9ede1fda2681d96771282f2862"
version = "v10.4.0"
@ -81,12 +69,7 @@
[[projects]]
name = "github.com/google/go-cmp"
packages = [
"cmp",
"cmp/internal/diff",
"cmp/internal/function",
"cmp/internal/value"
]
packages = ["cmp","cmp/internal/diff","cmp/internal/function","cmp/internal/value"]
revision = "8099a9787ce5dc5984ed879a3bda47dc730a8e97"
version = "v0.1.0"
@ -110,16 +93,9 @@
[[projects]]
name = "github.com/kurin/blazer"
packages = [
"b2",
"base",
"internal/b2assets",
"internal/b2types",
"internal/blog",
"x/window"
]
revision = "318e9768bf9a0fe52a64b9f8fe74f4f5caef6452"
version = "v0.4.4"
packages = ["b2","base","internal/b2assets","internal/b2types","internal/blog","x/window"]
revision = "7f1134c7489e86be5c924137996d4e421815f48a"
version = "v0.5.0"
[[projects]]
name = "github.com/marstr/guid"
@ -135,15 +111,7 @@
[[projects]]
name = "github.com/minio/minio-go"
packages = [
".",
"pkg/credentials",
"pkg/encrypt",
"pkg/policy",
"pkg/s3signer",
"pkg/s3utils",
"pkg/set"
]
packages = [".","pkg/credentials","pkg/encrypt","pkg/policy","pkg/s3signer","pkg/s3utils","pkg/set"]
revision = "66252c2a3c15f7b90cc8493d497a04ac3b6e3606"
version = "5.0.0"
@ -189,52 +157,6 @@
revision = "db83917be3b88cc307464b7d8a221c173e34a0db"
version = "v0.2.0"
[[projects]]
branch = "master"
name = "github.com/restic/restic"
packages = [
"internal/archiver",
"internal/backend",
"internal/backend/azure",
"internal/backend/b2",
"internal/backend/gs",
"internal/backend/local",
"internal/backend/location",
"internal/backend/mem",
"internal/backend/rclone",
"internal/backend/rest",
"internal/backend/s3",
"internal/backend/sftp",
"internal/backend/swift",
"internal/backend/test",
"internal/cache",
"internal/checker",
"internal/crypto",
"internal/debug",
"internal/errors",
"internal/filter",
"internal/fs",
"internal/fuse",
"internal/hashing",
"internal/index",
"internal/limiter",
"internal/list",
"internal/migrations",
"internal/mock",
"internal/options",
"internal/pack",
"internal/repository",
"internal/restic",
"internal/restorer",
"internal/test",
"internal/textfile",
"internal/ui",
"internal/ui/termstatus",
"internal/walker",
"internal/worker"
]
revision = "bd742ddb692ffeaf5ac24eefdff0c0ba3e7c17fb"
[[projects]]
name = "github.com/russross/blackfriday"
packages = ["."]
@ -255,10 +177,7 @@
[[projects]]
name = "github.com/spf13/cobra"
packages = [
".",
"doc"
]
packages = [".","doc"]
revision = "a1f051bc3eba734da4772d60e2d677f47cf93ef4"
version = "v0.0.2"
@ -271,44 +190,19 @@
[[projects]]
branch = "master"
name = "golang.org/x/crypto"
packages = [
"argon2",
"blake2b",
"curve25519",
"ed25519",
"ed25519/internal/edwards25519",
"internal/chacha20",
"pbkdf2",
"poly1305",
"scrypt",
"ssh",
"ssh/terminal"
]
packages = ["argon2","blake2b","curve25519","ed25519","ed25519/internal/edwards25519","internal/chacha20","pbkdf2","poly1305","scrypt","ssh","ssh/terminal"]
revision = "4ec37c66abab2c7e02ae775328b2ff001c3f025a"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = [
"context",
"context/ctxhttp",
"http2",
"http2/hpack",
"idna",
"lex/httplex"
]
packages = ["context","context/ctxhttp","http2","http2/hpack","idna","lex/httplex"]
revision = "6078986fec03a1dcc236c34816c71b0e05018fda"
[[projects]]
branch = "master"
name = "golang.org/x/oauth2"
packages = [
".",
"google",
"internal",
"jws",
"jwt"
]
packages = [".","google","internal","jws","jwt"]
revision = "fdc9e635145ae97e6c2cb777c48305600cf515cb"
[[projects]]
@ -320,65 +214,24 @@
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = [
"cpu",
"unix",
"windows"
]
packages = ["cpu","unix","windows"]
revision = "7db1c3b1a98089d0071c84f646ff5c96aad43682"
[[projects]]
name = "golang.org/x/text"
packages = [
"collate",
"collate/build",
"encoding",
"encoding/internal",
"encoding/internal/identifier",
"encoding/unicode",
"internal/colltab",
"internal/gen",
"internal/tag",
"internal/triegen",
"internal/ucd",
"internal/utf8internal",
"language",
"runes",
"secure/bidirule",
"transform",
"unicode/bidi",
"unicode/cldr",
"unicode/norm",
"unicode/rangetable"
]
packages = ["collate","collate/build","encoding","encoding/internal","encoding/internal/identifier","encoding/unicode","internal/colltab","internal/gen","internal/tag","internal/triegen","internal/ucd","internal/utf8internal","language","runes","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable"]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "master"
name = "google.golang.org/api"
packages = [
"gensupport",
"googleapi",
"googleapi/internal/uritemplates",
"storage/v1"
]
packages = ["gensupport","googleapi","googleapi/internal/uritemplates","storage/v1"]
revision = "dbbc13f71100fa6ece308335445fca6bb0dd5c2f"
[[projects]]
name = "google.golang.org/appengine"
packages = [
".",
"internal",
"internal/app_identity",
"internal/base",
"internal/datastore",
"internal/log",
"internal/modules",
"internal/remote_api",
"internal/urlfetch",
"urlfetch"
]
packages = [".","internal","internal/app_identity","internal/base","internal/datastore","internal/log","internal/modules","internal/remote_api","internal/urlfetch","urlfetch"]
revision = "150dc57a1b433e64154302bdc40b6bb8aefa313a"
version = "v1.0.0"
@ -397,6 +250,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "cfab88aa746c1535f17c59e8db9ee2ca6908b840f71d7331de84c722221348d0"
inputs-digest = "a5de339cba7570216b212439b90e1e6c384c94be8342fe7755b7cb66aa0a3440"
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -0,0 +1,9 @@
Enhancement: Update the Backblaze B2 library
We've updated the library we're using for accessing the Backblaze B2 service to
0.5.0 to include support for upcoming so-called "application keys". With this
feature, you can create access credentials for B2 which are restricted to e.g.
a single bucket or even a sub-directory of a bucket.
https://github.com/restic/restic/pull/1901
https://github.com/kurin/blazer

View file

@ -1,4 +1,4 @@
bonfire
bin/bonfire/bonfire
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o

View file

@ -32,6 +32,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"sync"
@ -46,6 +47,7 @@ type Client struct {
sWriters map[string]*Writer
sReaders map[string]*Reader
sMethods []methodCounter
opts clientOptions
}
// NewClient creates and returns a new Client with valid B2 service account
@ -63,7 +65,10 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
},
}
opts = append(opts, client(c))
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
for _, f := range opts {
f(&c.opts)
}
if err := c.backend.authorizeAccount(ctx, account, key, c.opts); err != nil {
return nil, err
}
return c, nil
@ -75,27 +80,9 @@ type clientOptions struct {
failSomeUploads bool
expireTokens bool
capExceeded bool
apiBase string
userAgents []string
}
// for testing
func (c clientOptions) eq(o clientOptions) bool {
if c.client != o.client ||
c.transport != o.transport ||
c.failSomeUploads != o.failSomeUploads ||
c.expireTokens != o.expireTokens ||
c.capExceeded != o.capExceeded {
return false
}
if len(c.userAgents) != len(o.userAgents) {
return false
}
for i := range c.userAgents {
if c.userAgents[i] != o.userAgents[i] {
return false
}
}
return true
writerOpts []WriterOption
}
// A ClientOption allows callers to adjust various per-client settings.
@ -112,6 +99,13 @@ func UserAgent(agent string) ClientOption {
}
}
// APIBase returns a ClientOption specifying the URL root of API requests.
func APIBase(url string) ClientOption {
return func(o *clientOptions) {
o.apiBase = url
}
}
// Transport sets the underlying HTTP transport mechanism. If unset,
// http.DefaultTransport is used.
func Transport(rt http.RoundTripper) ClientOption {
@ -434,7 +428,7 @@ type Attrs struct {
ContentType string // Used on upload, default is "application/octet-stream".
Status ObjectState // Not used on upload.
UploadTimestamp time.Time // Not used on upload.
SHA1 string // Not used on upload. Can be "none" for large files.
SHA1 string // Can be "none" for large files. If set on upload, will be used for large files.
LastModified time.Time // If present, and there are fewer than 10 keys in the Info field, this is saved on upload.
Info map[string]string // Save arbitrary metadata on upload, but limited to 10 keys.
}
@ -474,6 +468,9 @@ func (o *Object) Attrs(ctx context.Context) (*Attrs, error) {
mtime = time.Unix(ms/1e3, (ms%1e3)*1e6)
delete(info, "src_last_modified_millis")
}
if v, ok := info["large_file_sha1"]; ok {
sha = v
}
return &Attrs{
Name: name,
Size: size,
@ -524,14 +521,21 @@ func (o *Object) URL() string {
// overwritten are not deleted, but are "hidden".
//
// Callers must close the writer when finished and check the error status.
func (o *Object) NewWriter(ctx context.Context) *Writer {
func (o *Object) NewWriter(ctx context.Context, opts ...WriterOption) *Writer {
ctx, cancel := context.WithCancel(ctx)
return &Writer{
w := &Writer{
o: o,
name: o.name,
ctx: ctx,
cancel: cancel,
}
for _, f := range o.b.c.opts.writerOpts {
f(w)
}
for _, f := range opts {
f(w)
}
return w
}
// NewRangeReader returns a reader for the given object, reading up to length
@ -760,5 +764,24 @@ func (b *Bucket) getObject(ctx context.Context, name string) (*Object, error) {
// in a private bucket. Only objects that begin with prefix can be accessed.
// The token expires after the given duration.
func (b *Bucket) AuthToken(ctx context.Context, prefix string, valid time.Duration) (string, error) {
return b.b.getDownloadAuthorization(ctx, prefix, valid)
return b.b.getDownloadAuthorization(ctx, prefix, valid, "")
}
// AuthURL returns a URL for the given object with embedded token and,
// possibly, b2ContentDisposition arguments. Leave b2cd blank for no content
// disposition.
func (o *Object) AuthURL(ctx context.Context, valid time.Duration, b2cd string) (*url.URL, error) {
token, err := o.b.b.getDownloadAuthorization(ctx, o.name, valid, b2cd)
if err != nil {
return nil, err
}
urlString := fmt.Sprintf("%s?Authorization=%s", o.URL(), url.QueryEscape(token))
if b2cd != "" {
urlString = fmt.Sprintf("%s&b2ContentDisposition=%s", urlString, url.QueryEscape(b2cd))
}
u, err := url.Parse(urlString)
if err != nil {
return nil, err
}
return u, nil
}

View file

@ -71,7 +71,7 @@ type testRoot struct {
bucketMap map[string]map[string]string
}
func (t *testRoot) authorizeAccount(context.Context, string, string, ...ClientOption) error {
func (t *testRoot) authorizeAccount(context.Context, string, string, clientOptions) error {
t.auths++
return nil
}
@ -108,6 +108,13 @@ func (t *testRoot) transient(err error) bool {
return e.retry || e.reupload || e.backoff > 0
}
func (t *testRoot) createKey(context.Context, string, []string, time.Duration, string, string) (b2KeyInterface, error) {
return nil, nil
}
func (t *testRoot) listKeys(context.Context, int, string) ([]b2KeyInterface, string, error) {
return nil, "", nil
}
func (t *testRoot) createBucket(_ context.Context, name, _ string, _ map[string]string, _ []LifecycleRule) (b2BucketInterface, error) {
if err := t.errs.getError("createBucket"); err != nil {
return nil, err
@ -147,6 +154,7 @@ func (t *testBucket) btype() string { return
func (t *testBucket) attrs() *BucketAttrs { return nil }
func (t *testBucket) deleteBucket(context.Context) error { return nil }
func (t *testBucket) updateBucket(context.Context, *BucketAttrs) error { return nil }
func (t *testBucket) id() string { return "" }
func (t *testBucket) getUploadURL(context.Context) (b2URLInterface, error) {
if err := t.errs.getError("getUploadURL"); err != nil {
@ -221,7 +229,7 @@ func (t *testBucket) downloadFileByName(_ context.Context, name string, offset,
}
func (t *testBucket) hideFile(context.Context, string) (b2FileInterface, error) { return nil, nil }
func (t *testBucket) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) {
func (t *testBucket) getDownloadAuthorization(context.Context, string, time.Duration, string) (string, error) {
return "", nil
}
func (t *testBucket) baseURL() string { return "" }

View file

@ -28,22 +28,25 @@ type beRootInterface interface {
reauth(error) bool
transient(error) bool
reupload(error) bool
authorizeAccount(context.Context, string, string, ...ClientOption) error
authorizeAccount(context.Context, string, string, clientOptions) error
reauthorizeAccount(context.Context) error
createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (beBucketInterface, error)
listBuckets(context.Context) ([]beBucketInterface, error)
createKey(context.Context, string, []string, time.Duration, string, string) (beKeyInterface, error)
listKeys(context.Context, int, string) ([]beKeyInterface, string, error)
}
type beRoot struct {
account, key string
b2i b2RootInterface
options []ClientOption
options clientOptions
}
type beBucketInterface interface {
name() string
btype() BucketType
attrs() *BucketAttrs
id() string
updateBucket(context.Context, *BucketAttrs) error
deleteBucket(context.Context) error
getUploadURL(context.Context) (beURLInterface, error)
@ -53,7 +56,7 @@ type beBucketInterface interface {
listUnfinishedLargeFiles(context.Context, int, string) ([]beFileInterface, string, error)
downloadFileByName(context.Context, string, int64, int64) (beFileReaderInterface, error)
hideFile(context.Context, string) (beFileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
getDownloadAuthorization(context.Context, string, time.Duration, string) (string, error)
baseURL() string
file(string, string) beFileInterface
}
@ -145,26 +148,39 @@ type beFileInfo struct {
stamp time.Time
}
type beKeyInterface interface {
del(context.Context) error
caps() []string
name() string
expires() time.Time
secret() string
}
type beKey struct {
b2i beRootInterface
k b2KeyInterface
}
func (r *beRoot) backoff(err error) time.Duration { return r.b2i.backoff(err) }
func (r *beRoot) reauth(err error) bool { return r.b2i.reauth(err) }
func (r *beRoot) reupload(err error) bool { return r.b2i.reupload(err) }
func (r *beRoot) transient(err error) bool { return r.b2i.transient(err) }
func (r *beRoot) authorizeAccount(ctx context.Context, account, key string, opts ...ClientOption) error {
func (r *beRoot) authorizeAccount(ctx context.Context, account, key string, c clientOptions) error {
f := func() error {
if err := r.b2i.authorizeAccount(ctx, account, key, opts...); err != nil {
if err := r.b2i.authorizeAccount(ctx, account, key, c); err != nil {
return err
}
r.account = account
r.key = key
r.options = opts
r.options = c
return nil
}
return withBackoff(ctx, r, f)
}
func (r *beRoot) reauthorizeAccount(ctx context.Context) error {
return r.authorizeAccount(ctx, r.account, r.key, r.options...)
return r.authorizeAccount(ctx, r.account, r.key, r.options)
}
func (r *beRoot) createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (beBucketInterface, error) {
@ -213,17 +229,58 @@ func (r *beRoot) listBuckets(ctx context.Context) ([]beBucketInterface, error) {
return buckets, nil
}
func (b *beBucket) name() string {
return b.b2bucket.name()
func (r *beRoot) createKey(ctx context.Context, name string, caps []string, valid time.Duration, bucketID string, prefix string) (beKeyInterface, error) {
var k *beKey
f := func() error {
g := func() error {
got, err := r.b2i.createKey(ctx, name, caps, valid, bucketID, prefix)
if err != nil {
return err
}
k = &beKey{
b2i: r,
k: got,
}
return nil
}
return withReauth(ctx, r, g)
}
if err := withBackoff(ctx, r, f); err != nil {
return nil, err
}
return k, nil
}
func (b *beBucket) btype() BucketType {
return BucketType(b.b2bucket.btype())
func (r *beRoot) listKeys(ctx context.Context, max int, next string) ([]beKeyInterface, string, error) {
var keys []beKeyInterface
var cur string
f := func() error {
g := func() error {
got, n, err := r.b2i.listKeys(ctx, max, next)
if err != nil {
return err
}
cur = n
for _, g := range got {
keys = append(keys, &beKey{
b2i: r,
k: g,
})
}
return nil
}
return withReauth(ctx, r, g)
}
if err := withBackoff(ctx, r, f); err != nil {
return nil, "", err
}
return keys, cur, nil
}
func (b *beBucket) attrs() *BucketAttrs {
return b.b2bucket.attrs()
}
func (b *beBucket) name() string { return b.b2bucket.name() }
func (b *beBucket) btype() BucketType { return BucketType(b.b2bucket.btype()) }
func (b *beBucket) attrs() *BucketAttrs { return b.b2bucket.attrs() }
func (b *beBucket) id() string { return b.b2bucket.id() }
func (b *beBucket) updateBucket(ctx context.Context, attrs *BucketAttrs) error {
f := func() error {
@ -412,11 +469,11 @@ func (b *beBucket) hideFile(ctx context.Context, name string) (beFileInterface,
return file, nil
}
func (b *beBucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration) (string, error) {
func (b *beBucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration, s string) (string, error) {
var tok string
f := func() error {
g := func() error {
t, err := b.b2bucket.getDownloadAuthorization(ctx, p, v)
t, err := b.b2bucket.getDownloadAuthorization(ctx, p, v, s)
if err != nil {
return err
}
@ -649,6 +706,12 @@ func (b *beFilePart) number() int { return b.b2filePart.number() }
func (b *beFilePart) sha1() string { return b.b2filePart.sha1() }
func (b *beFilePart) size() int64 { return b.b2filePart.size() }
func (b *beKey) del(ctx context.Context) error { return b.k.del(ctx) }
func (b *beKey) caps() []string { return b.k.caps() }
func (b *beKey) name() string { return b.k.name() }
func (b *beKey) expires() time.Time { return b.k.expires() }
func (b *beKey) secret() string { return b.k.secret() }
func jitter(d time.Duration) time.Duration {
f := float64(d)
f /= 50
@ -657,8 +720,8 @@ func jitter(d time.Duration) time.Duration {
}
func getBackoff(d time.Duration) time.Duration {
if d > 15*time.Second {
return d + jitter(d)
if d > 30*time.Second {
return 30*time.Second + jitter(d)
}
return d*2 + jitter(d*2)
}

View file

@ -27,19 +27,22 @@ import (
// the only file in b2 that imports base.
type b2RootInterface interface {
authorizeAccount(context.Context, string, string, ...ClientOption) error
authorizeAccount(context.Context, string, string, clientOptions) error
transient(error) bool
backoff(error) time.Duration
reauth(error) bool
reupload(error) bool
createBucket(context.Context, string, string, map[string]string, []LifecycleRule) (b2BucketInterface, error)
listBuckets(context.Context) ([]b2BucketInterface, error)
createKey(context.Context, string, []string, time.Duration, string, string) (b2KeyInterface, error)
listKeys(context.Context, int, string) ([]b2KeyInterface, string, error)
}
type b2BucketInterface interface {
name() string
btype() string
attrs() *BucketAttrs
id() string
updateBucket(context.Context, *BucketAttrs) error
deleteBucket(context.Context) error
getUploadURL(context.Context) (b2URLInterface, error)
@ -49,7 +52,7 @@ type b2BucketInterface interface {
listUnfinishedLargeFiles(context.Context, int, string) ([]b2FileInterface, string, error)
downloadFileByName(context.Context, string, int64, int64) (b2FileReaderInterface, error)
hideFile(context.Context, string) (b2FileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
getDownloadAuthorization(context.Context, string, time.Duration, string) (string, error)
baseURL() string
file(string, string) b2FileInterface
}
@ -96,6 +99,14 @@ type b2FilePartInterface interface {
size() int64
}
type b2KeyInterface interface {
del(context.Context) error
caps() []string
name() string
expires() time.Time
secret() string
}
type b2Root struct {
b *base.B2
}
@ -132,11 +143,11 @@ type b2FilePart struct {
b *base.FilePart
}
func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts ...ClientOption) error {
c := &clientOptions{}
for _, f := range opts {
f(c)
type b2Key struct {
b *base.Key
}
func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, c clientOptions) error {
var aopts []base.AuthOption
ct := &clientTransport{client: c.client}
if c.transport != nil {
@ -152,6 +163,9 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
if c.capExceeded {
aopts = append(aopts, base.ForceCapExceeded())
}
if c.apiBase != "" {
aopts = append(aopts, base.SetAPIBase(c.apiBase))
}
for _, agent := range c.userAgents {
aopts = append(aopts, base.UserAgent(agent))
}
@ -249,6 +263,26 @@ func (b *b2Bucket) updateBucket(ctx context.Context, attrs *BucketAttrs) error {
return err
}
func (b *b2Root) createKey(ctx context.Context, name string, caps []string, valid time.Duration, bucketID string, prefix string) (b2KeyInterface, error) {
k, err := b.b.CreateKey(ctx, name, caps, valid, bucketID, prefix)
if err != nil {
return nil, err
}
return &b2Key{k}, nil
}
func (b *b2Root) listKeys(ctx context.Context, max int, next string) ([]b2KeyInterface, string, error) {
keys, next, err := b.b.ListKeys(ctx, max, next)
if err != nil {
return nil, "", err
}
var k []b2KeyInterface
for _, key := range keys {
k = append(k, &b2Key{key})
}
return k, next, nil
}
func (b *b2Bucket) deleteBucket(ctx context.Context) error {
return b.b.DeleteBucket(ctx)
}
@ -277,6 +311,8 @@ func (b *b2Bucket) attrs() *BucketAttrs {
}
}
func (b *b2Bucket) id() string { return b.b.ID }
func (b *b2Bucket) getUploadURL(ctx context.Context) (b2URLInterface, error) {
url, err := b.b.GetUploadURL(ctx)
if err != nil {
@ -352,8 +388,8 @@ func (b *b2Bucket) hideFile(ctx context.Context, name string) (b2FileInterface,
return &b2File{f}, nil
}
func (b *b2Bucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration) (string, error) {
return b.b.GetDownloadAuthorization(ctx, p, v)
func (b *b2Bucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration, s string) (string, error) {
return b.b.GetDownloadAuthorization(ctx, p, v, s)
}
func (b *b2Bucket) baseURL() string {
@ -466,3 +502,9 @@ func (b *b2FileInfo) stats() (string, string, int64, string, map[string]string,
func (b *b2FilePart) number() int { return b.b.Number }
func (b *b2FilePart) sha1() string { return b.b.SHA1 }
func (b *b2FilePart) size() int64 { return b.b.Size }
func (b *b2Key) del(ctx context.Context) error { return b.b.Delete(ctx) }
func (b *b2Key) caps() []string { return b.b.Capabilities }
func (b *b2Key) name() string { return b.b.Name }
func (b *b2Key) expires() time.Time { return b.b.Expires }
func (b *b2Key) secret() string { return b.b.Secret }

View file

@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"reflect"
@ -29,6 +30,7 @@ import (
"testing"
"time"
"github.com/kurin/blazer/internal/blog"
"github.com/kurin/blazer/x/transport"
)
@ -137,6 +139,9 @@ func TestReaderFromLive(t *testing.T) {
if rn != n {
t.Errorf("Read from B2: got %d bytes, want %d bytes", rn, n)
}
if err, ok := r.Verify(); ok && err != nil {
t.Errorf("Read from B2: %v", err)
}
if err := r.Close(); err != nil {
t.Errorf("r.Close(): %v", err)
}
@ -323,6 +328,7 @@ func TestAttrs(t *testing.T) {
for _, attrs := range attrlist {
o := bucket.Object(e.name)
w := o.NewWriter(ctx).WithAttrs(attrs)
w.ChunkSize = 5e6
if _, err := io.Copy(w, io.LimitReader(zReader{}, e.size)); err != nil {
t.Error(err)
continue
@ -429,6 +435,57 @@ func TestAuthTokLive(t *testing.T) {
}
}
func TestObjAuthTokLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
table := []struct {
obj string
d time.Duration
b2cd string
}{
{
obj: "foo/bar",
d: time.Minute,
},
{
obj: "foo2/thing.pdf",
d: time.Minute,
b2cd: "attachment",
},
{
obj: "foo2/thing.pdf",
d: time.Minute,
b2cd: `attachment; filename="what.png"`,
},
}
for _, e := range table {
fw := bucket.Object(e.obj).NewWriter(ctx)
io.Copy(fw, io.LimitReader(zReader{}, 1e5))
if err := fw.Close(); err != nil {
t.Fatal(err)
}
url, err := bucket.Object(e.obj).AuthURL(ctx, e.d, e.b2cd)
if err != nil {
t.Fatal(err)
}
blog.V(2).Infof("downloading %s", url.String())
frsp, err := http.Get(url.String())
if err != nil {
t.Fatal(err)
}
if frsp.StatusCode != 200 {
t.Fatalf("%s: got %s, want 200", url.String(), frsp.Status)
}
}
}
func TestRangeReaderLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
@ -510,6 +567,9 @@ func TestRangeReaderLive(t *testing.T) {
if got != want {
t.Errorf("NewRangeReader(_, %d, %d): got %q, want %q", e.offset, e.length, got, want)
}
if err, ok := r.Verify(); ok && err != nil {
t.Errorf("NewRangeReader(_, %d, %d): %v", e.offset, e.length, err)
}
}
}
@ -863,29 +923,161 @@ func TestReauthPreservesOptions(t *testing.T) {
bucket, done := startLiveTest(ctx, t)
defer done()
var first []ClientOption
opts := bucket.r.(*beRoot).options
for _, o := range opts {
first = append(first, o)
}
first := bucket.r.(*beRoot).options
if err := bucket.r.reauthorizeAccount(ctx); err != nil {
t.Fatalf("reauthorizeAccount: %v", err)
}
second := bucket.r.(*beRoot).options
if len(second) != len(first) {
t.Fatalf("options mismatch: got %d options, wanted %d", len(second), len(first))
if !reflect.DeepEqual(first, second) {
// Test that they are literally the same set of options, which is an
// implementation detail but is fine for now.
t.Errorf("options mismatch: got %v, want %v", second, first)
}
}
var f, s clientOptions
for i := range first {
first[i](&f)
second[i](&s)
func TestVerifyReader(t *testing.T) {
ctx := context.Background()
bucket, done := startLiveTest(ctx, t)
defer done()
table := []struct {
name string
fakeSHA string
size int64
off, len int64
valid bool
}{
{
name: "first",
size: 100,
off: 0,
len: -1,
valid: true,
},
{
name: "second",
size: 100,
off: 0,
len: 100,
valid: true,
},
{
name: "third",
size: 100,
off: 0,
len: 99,
valid: false,
},
{
name: "fourth",
size: 5e6 + 100,
off: 0,
len: -1,
valid: false,
},
{
name: "fifth",
size: 5e6 + 100,
off: 0,
len: -1,
fakeSHA: "fbc815f2d6518858dec83ccb46263875fc894d88",
valid: true,
},
}
if !f.eq(s) {
t.Errorf("options mismatch: got %v, want %v", s, f)
for _, e := range table {
o := bucket.Object(e.name)
w := o.NewWriter(ctx)
if e.fakeSHA != "" {
w = w.WithAttrs(&Attrs{SHA1: e.fakeSHA})
}
w.ChunkSize = 5e6
if _, err := io.Copy(w, io.LimitReader(zReader{}, e.size)); err != nil {
t.Error(err)
continue
}
if err := w.Close(); err != nil {
t.Error(err)
continue
}
r := o.NewRangeReader(ctx, e.off, e.len)
if _, err := io.Copy(ioutil.Discard, r); err != nil {
t.Error(err)
}
err, ok := r.Verify()
if ok != e.valid {
t.Errorf("%s: bad validity: got %v, want %v", e.name, ok, e.valid)
}
if e.valid && err != nil {
t.Errorf("%s does not verify: %v", e.name, err)
}
}
}
func TestCreateDeleteKey(t *testing.T) {
ctx := context.Background()
bucket, done := startLiveTest(ctx, t)
defer done()
table := []struct {
d time.Duration
e time.Time
bucket bool
cap []string
pfx string
}{
{
cap: []string{"deleteKeys"},
},
{
d: time.Minute,
cap: []string{"deleteKeys"},
pfx: "prefox",
},
{
e: time.Now().Add(time.Minute), // <shrug emojis>
cap: []string{"writeFiles", "listFiles"},
bucket: true,
},
{
d: time.Minute,
cap: []string{"writeFiles", "listFiles"},
pfx: "prefox",
bucket: true,
},
}
for _, e := range table {
var opts []KeyOption
for _, cap := range e.cap {
opts = append(opts, Capability(cap))
}
if e.d != 0 {
opts = append(opts, Lifetime(e.d))
}
if !e.e.IsZero() {
opts = append(opts, Deadline(e.e))
}
var key *Key
if e.bucket {
opts = append(opts, Prefix(e.pfx))
bkey, err := bucket.CreateKey(ctx, "whee", opts...)
if err != nil {
t.Errorf("Bucket.CreateKey(%v, %v): %v", bucket.Name(), e, err)
continue
}
key = bkey
} else {
gkey, err := bucket.c.CreateKey(ctx, "whee", opts...)
if err != nil {
t.Errorf("Client.CreateKey(%v): %v", e, err)
continue
}
key = gkey
}
if err := key.Delete(ctx); err != nil {
t.Errorf("key.Delete(): %v", err)
}
}
}

148
vendor/github.com/kurin/blazer/b2/key.go generated vendored Normal file
View file

@ -0,0 +1,148 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"context"
"errors"
"io"
"time"
)
// Key is a B2 application key. A Key grants limited access on a global or
// per-bucket basis.
type Key struct {
c *Client
k beKeyInterface
}
// Capabilities returns the list of capabilites granted by this application
// key.
func (k *Key) Capabilities() []string { return k.k.caps() }
// Name returns the user-supplied name of this application key. Key names are
// useless.
func (k *Key) Name() string { return k.k.name() }
// Expires returns the expiration date of this application key.
func (k *Key) Expires() time.Time { return k.k.expires() }
// Delete removes the key from B2.
func (k *Key) Delete(ctx context.Context) error { return k.k.del(ctx) }
// Secret returns the value that should be passed into NewClient(). It is only
// available on newly created keys; it is not available from ListKey
// operations.
func (k *Key) Secret() string { return k.k.secret() }
type keyOptions struct {
caps []string
prefix string
lifetime time.Duration
}
// KeyOption specifies desired properties for application keys.
type KeyOption func(*keyOptions)
// Lifetime requests a key with the given lifetime.
func Lifetime(d time.Duration) KeyOption {
return func(k *keyOptions) {
k.lifetime = d
}
}
// Deadline requests a key that expires after the given date.
func Deadline(t time.Time) KeyOption {
d := t.Sub(time.Now())
return Lifetime(d)
}
// Capability requests a key with the given capability.
func Capability(cap string) KeyOption {
return func(k *keyOptions) {
k.caps = append(k.caps, cap)
}
}
// Prefix limits the requested application key to be valid only for objects
// that begin with prefix. This can only be used when requesting an
// application key within a specific bucket.
func Prefix(prefix string) KeyOption {
return func(k *keyOptions) {
k.prefix = prefix
}
}
// CreateKey creates a global application key that is valid for all buckets in
// this project. The key's secret will only be accessible on the object
// returned from this call.
func (c *Client) CreateKey(ctx context.Context, name string, opts ...KeyOption) (*Key, error) {
var ko keyOptions
for _, o := range opts {
o(&ko)
}
if ko.prefix != "" {
return nil, errors.New("Prefix is not a valid option for global application keys")
}
ki, err := c.backend.createKey(ctx, name, ko.caps, ko.lifetime, "", "")
if err != nil {
return nil, err
}
return &Key{
c: c,
k: ki,
}, nil
}
// ListKeys lists all the keys associated with this project. It takes the
// maximum number of keys it should return in a call, as well as a cursor
// (which should be empty for the initial call). It will return up to count
// keys, as well as the cursor for the next invocation.
//
// ListKeys returns io.EOF when there are no more keys, although it may do so
// concurrently with the final set of keys.
func (c *Client) ListKeys(ctx context.Context, count int, cursor string) ([]*Key, string, error) {
ks, next, err := c.backend.listKeys(ctx, count, cursor)
if err != nil {
return nil, "", err
}
if len(ks) == 0 {
return nil, "", io.EOF
}
var keys []*Key
for _, k := range ks {
keys = append(keys, &Key{
c: c,
k: k,
})
}
return keys, next, nil
}
// CreateKey creates a scoped application key that is valid only for this bucket.
func (b *Bucket) CreateKey(ctx context.Context, name string, opts ...KeyOption) (*Key, error) {
var ko keyOptions
for _, o := range opts {
o(&ko)
}
ki, err := b.r.createKey(ctx, name, ko.caps, ko.lifetime, b.b.id(), ko.prefix)
if err != nil {
return nil, err
}
return &Key{
c: b.c,
k: ki,
}, nil
}

View file

@ -17,9 +17,13 @@ package b2
import (
"bytes"
"context"
"crypto/sha1"
"errors"
"fmt"
"hash"
"io"
"sync"
"time"
"github.com/kurin/blazer/internal/blog"
)
@ -50,9 +54,13 @@ type Reader struct {
chrid int // chunks read
chbuf chan *rchunk
init sync.Once
chunks map[int]*rchunk
vrfy hash.Hash
readOffEnd bool
sha1 string
rmux sync.Mutex // guards rcond
rcond *sync.Cond
chunks map[int]*rchunk
emux sync.RWMutex // guards err, believe it or not
err error
@ -122,10 +130,12 @@ func (r *Reader) thread() {
}
r.length -= size
}
var b backoff
redo:
fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size)
if err == errNoMoreContent {
// this read generated a 416 so we are entirely past the end of the object
r.readOffEnd = true
buf.final = true
r.rmux.Lock()
r.chunks[chunkID] = buf
@ -138,7 +148,10 @@ func (r *Reader) thread() {
r.rcond.Broadcast()
return
}
rsize, _, _, _ := fr.stats()
rsize, _, sha1, _ := fr.stats()
if len(sha1) == 40 && r.sha1 != sha1 {
r.sha1 = sha1
}
mr := &meteredReader{r: noopResetter{fr}, size: int(rsize)}
r.smux.Lock()
r.smap[chunkID] = mr
@ -150,7 +163,12 @@ func (r *Reader) thread() {
r.smux.Unlock()
if i < int64(rsize) || err == io.ErrUnexpectedEOF {
// Probably the network connection was closed early. Retry.
blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, rsize)
blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying after %v", chunkID, i, rsize, b)
if err := b.wait(r.ctx); err != nil {
r.setErr(err)
r.rcond.Broadcast()
return
}
buf.Reset()
goto redo
}
@ -211,13 +229,13 @@ func (r *Reader) initFunc() {
r.thread()
r.chbuf <- &rchunk{}
}
r.vrfy = sha1.New()
}
func (r *Reader) Read(p []byte) (int, error) {
if err := r.getErr(); err != nil {
return 0, err
}
// TODO: check the SHA1 hash here and verify it on Close.
r.init.Do(r.initFunc)
chunk, err := r.curChunk()
if err != nil {
@ -225,6 +243,7 @@ func (r *Reader) Read(p []byte) (int, error) {
return 0, err
}
n, err := chunk.Read(p)
r.vrfy.Write(p[:n]) // Hash.Write never returns an error.
r.read += n
if err == io.EOF {
if chunk.final {
@ -256,38 +275,49 @@ func (r *Reader) status() *ReaderStatus {
return rs
}
// copied from io.Copy, basically.
func copyContext(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error) {
buf := make([]byte, 32*1024)
for {
if ctx.Err() != nil {
err = ctx.Err()
return
// Verify checks the SHA1 hash on download and compares it to the SHA1 hash
// submitted on upload. If the two differ, this returns an error. If the
// correct hash could not be calculated (if, for example, the entire object was
// not read, or if the object was uploaded as a "large file" and thus the SHA1
// hash was not sent), this returns (nil, false).
func (r *Reader) Verify() (error, bool) {
got := fmt.Sprintf("%x", r.vrfy.Sum(nil))
if r.sha1 == got {
return nil, true
}
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
// TODO: if the exact length of the file is requested AND the checksum is
// bad, this will return (nil, false) instead of (an error, true). This is
// because there's no good way that I can tell to determine that we've hit
// the end of the file without reading off the end. Consider reading N+1
// bytes at the very end to close this hole.
if r.offset > 0 || !r.readOffEnd || len(r.sha1) != 40 {
return nil, false
}
if ew != nil {
err = ew
break
return fmt.Errorf("bad hash: got %v, want %v", got, r.sha1), true
}
if nr != nw {
err = io.ErrShortWrite
break
// strip a writer of any non-Write methods
type onlyWriter struct{ w io.Writer }
func (ow onlyWriter) Write(p []byte) (int, error) { return ow.w.Write(p) }
func copyContext(ctx context.Context, w io.Writer, r io.Reader) (int64, error) {
var n int64
var err error
done := make(chan struct{})
go func() {
if _, ok := w.(*Writer); ok {
w = onlyWriter{w}
}
n, err = io.Copy(w, r)
close(done)
}()
select {
case <-done:
return n, err
case <-ctx.Done():
return 0, ctx.Err()
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return written, err
}
type noopResetter struct {
@ -295,3 +325,24 @@ type noopResetter struct {
}
func (noopResetter) Reset() error { return nil }
type backoff time.Duration
func (b *backoff) wait(ctx context.Context) error {
if *b == 0 {
*b = backoff(time.Millisecond)
}
select {
case <-time.After(time.Duration(*b)):
if time.Duration(*b) < time.Second*10 {
*b <<= 1
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (b backoff) String() string {
return time.Duration(b).String()
}

View file

@ -476,18 +476,42 @@ func (w *Writer) Close() error {
// WithAttrs sets the writable attributes of the resulting file to given
// values. WithAttrs must be called before the first call to Write.
//
// DEPRECATED: Use WithAttrsOption instead.
func (w *Writer) WithAttrs(attrs *Attrs) *Writer {
w.contentType = attrs.ContentType
w.info = make(map[string]string)
for k, v := range attrs.Info {
w.info[k] = v
}
if len(w.info) < 10 && attrs.SHA1 != "" {
w.info["large_file_sha1"] = attrs.SHA1
}
if len(w.info) < 10 && !attrs.LastModified.IsZero() {
w.info["src_last_modified_millis"] = fmt.Sprintf("%d", attrs.LastModified.UnixNano()/1e6)
}
return w
}
// A WriterOption sets Writer-specific behavior.
type WriterOption func(*Writer)
// WithAttrs attaches the given Attrs to the writer.
func WithAttrsOption(attrs *Attrs) WriterOption {
return func(w *Writer) {
w.WithAttrs(attrs)
}
}
// DefaultWriterOptions returns a ClientOption that will apply the given
// WriterOptions to every Writer. These options can be overridden by passing
// new options to NewWriter.
func DefaultWriterOptions(opts ...WriterOption) ClientOption {
return func(c *clientOptions) {
c.writerOpts = opts
}
}
func (w *Writer) status() *WriterStatus {
w.smux.RLock()
defer w.smux.RUnlock()

View file

@ -42,7 +42,7 @@ import (
const (
APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.4.4"
DefaultUserAgent = "blazer/0.5.0"
)
type b2err struct {
@ -427,7 +427,7 @@ func AuthorizeAccount(ctx context.Context, account, key string, opts ...AuthOpti
authToken: b2resp.AuthToken,
apiURI: b2resp.URI,
downloadURI: b2resp.DownloadURI,
minPartSize: b2resp.MinPartSize,
minPartSize: b2resp.PartSize,
opts: b2opts,
}, nil
}
@ -479,6 +479,14 @@ func ForceCapExceeded() AuthOption {
}
}
// SetAPIBase returns an AuthOption that uses the given URL as the base for API
// requests.
func SetAPIBase(url string) AuthOption {
return func(o *b2Options) {
o.apiBase = url
}
}
type LifecycleRule struct {
Prefix string
DaysNewUntilHidden int
@ -524,7 +532,7 @@ func (b *B2) CreateBucket(ctx context.Context, name, btype string, info map[stri
Name: name,
Info: b2resp.Info,
LifecycleRules: respRules,
id: b2resp.BucketID,
ID: b2resp.BucketID,
rev: b2resp.Revision,
b2: b,
}, nil
@ -534,7 +542,7 @@ func (b *B2) CreateBucket(ctx context.Context, name, btype string, info map[stri
func (b *Bucket) DeleteBucket(ctx context.Context) error {
b2req := &b2types.DeleteBucketRequest{
AccountID: b.b2.accountID,
BucketID: b.id,
BucketID: b.ID,
}
headers := map[string]string{
"Authorization": b.b2.authToken,
@ -548,7 +556,7 @@ type Bucket struct {
Type string
Info map[string]string
LifecycleRules []LifecycleRule
id string
ID string
rev int
b2 *B2
}
@ -565,7 +573,7 @@ func (b *Bucket) Update(ctx context.Context) (*Bucket, error) {
}
b2req := &b2types.UpdateBucketRequest{
AccountID: b.b2.accountID,
BucketID: b.id,
BucketID: b.ID,
// Name: b.Name,
Type: b.Type,
Info: b.Info,
@ -592,7 +600,7 @@ func (b *Bucket) Update(ctx context.Context) (*Bucket, error) {
Type: b2resp.Type,
Info: b2resp.Info,
LifecycleRules: respRules,
id: b2resp.BucketID,
ID: b2resp.BucketID,
b2: b.b2,
}, nil
}
@ -629,7 +637,7 @@ func (b *B2) ListBuckets(ctx context.Context) ([]*Bucket, error) {
Type: bucket.Type,
Info: bucket.Info,
LifecycleRules: rules,
id: bucket.BucketID,
ID: bucket.BucketID,
rev: bucket.Revision,
b2: b,
})
@ -660,7 +668,7 @@ func (url *URL) Reload(ctx context.Context) error {
// GetUploadURL wraps b2_get_upload_url.
func (b *Bucket) GetUploadURL(ctx context.Context) (*URL, error) {
b2req := &b2types.GetUploadURLRequest{
BucketID: b.id,
BucketID: b.ID,
}
b2resp := &b2types.GetUploadURLResponse{}
headers := map[string]string{
@ -745,7 +753,7 @@ type LargeFile struct {
// StartLargeFile wraps b2_start_large_file.
func (b *Bucket) StartLargeFile(ctx context.Context, name, contentType string, info map[string]string) (*LargeFile, error) {
b2req := &b2types.StartLargeFileRequest{
BucketID: b.id,
BucketID: b.ID,
Name: name,
ContentType: contentType,
Info: info,
@ -927,7 +935,7 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
// ListUnfinishedLargeFiles wraps b2_list_unfinished_large_files.
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]*File, string, error) {
b2req := &b2types.ListUnfinishedLargeFilesRequest{
BucketID: b.id,
BucketID: b.ID,
Continuation: continuation,
Count: count,
}
@ -962,7 +970,7 @@ func (b *Bucket) ListFileNames(ctx context.Context, count int, continuation, pre
b2req := &b2types.ListFileNamesRequest{
Count: count,
Continuation: continuation,
BucketID: b.id,
BucketID: b.ID,
Prefix: prefix,
Delimiter: delimiter,
}
@ -1000,7 +1008,7 @@ func (b *Bucket) ListFileNames(ctx context.Context, count int, continuation, pre
// ListFileVersions wraps b2_list_file_versions.
func (b *Bucket) ListFileVersions(ctx context.Context, count int, startName, startID, prefix, delimiter string) ([]*File, string, string, error) {
b2req := &b2types.ListFileVersionsRequest{
BucketID: b.id,
BucketID: b.ID,
Count: count,
StartName: startName,
StartID: startID,
@ -1038,11 +1046,12 @@ func (b *Bucket) ListFileVersions(ctx context.Context, count int, startName, sta
}
// GetDownloadAuthorization wraps b2_get_download_authorization.
func (b *Bucket) GetDownloadAuthorization(ctx context.Context, prefix string, valid time.Duration) (string, error) {
func (b *Bucket) GetDownloadAuthorization(ctx context.Context, prefix string, valid time.Duration, contentDisposition string) (string, error) {
b2req := &b2types.GetDownloadAuthorizationRequest{
BucketID: b.id,
BucketID: b.ID,
Prefix: prefix,
Valid: int(valid.Seconds()),
ContentDisposition: contentDisposition,
}
b2resp := &b2types.GetDownloadAuthorizationResponse{}
headers := map[string]string{
@ -1121,9 +1130,13 @@ func (b *Bucket) DownloadFileByName(ctx context.Context, name string, offset, si
}
info[name] = val
}
sha1 := resp.Header.Get("X-Bz-Content-Sha1")
if sha1 == "none" && info["Large_file_sha1"] != "" {
sha1 = info["Large_file_sha1"]
}
return &FileReader{
ReadCloser: resp.Body,
SHA1: resp.Header.Get("X-Bz-Content-Sha1"),
SHA1: sha1,
ID: resp.Header.Get("X-Bz-File-Id"),
ContentType: resp.Header.Get("Content-Type"),
ContentLength: int(clen),
@ -1134,7 +1147,7 @@ func (b *Bucket) DownloadFileByName(ctx context.Context, name string, offset, si
// HideFile wraps b2_hide_file.
func (b *Bucket) HideFile(ctx context.Context, name string) (*File, error) {
b2req := &b2types.HideFileRequest{
BucketID: b.id,
BucketID: b.ID,
File: name,
}
b2resp := &b2types.HideFileResponse{}
@ -1190,3 +1203,78 @@ func (f *File) GetFileInfo(ctx context.Context) (*FileInfo, error) {
}
return f.Info, nil
}
// Key is a B2 application key.
type Key struct {
ID string
Secret string
Name string
Capabilities []string
Expires time.Time
b2 *B2
}
// CreateKey wraps b2_create_key.
func (b *B2) CreateKey(ctx context.Context, name string, caps []string, valid time.Duration, bucketID string, prefix string) (*Key, error) {
b2req := &b2types.CreateKeyRequest{
AccountID: b.accountID,
Capabilities: caps,
Name: name,
Valid: int(valid.Seconds()),
BucketID: bucketID,
Prefix: prefix,
}
b2resp := &b2types.CreateKeyResponse{}
headers := map[string]string{
"Authorization": b.authToken,
}
if err := b.opts.makeRequest(ctx, "b2_create_key", "POST", b.apiURI+b2types.V1api+"b2_create_key", b2req, b2resp, headers, nil); err != nil {
return nil, err
}
return &Key{
Name: b2resp.Name,
ID: b2resp.ID,
Secret: b2resp.Secret,
Capabilities: b2resp.Capabilities,
Expires: millitime(b2resp.Expires),
b2: b,
}, nil
}
// Delete wraps b2_delete_key.
func (k *Key) Delete(ctx context.Context) error {
b2req := &b2types.DeleteKeyRequest{
KeyID: k.ID,
}
headers := map[string]string{
"Authorization": k.b2.authToken,
}
return k.b2.opts.makeRequest(ctx, "b2_delete_key", "POST", k.b2.apiURI+b2types.V1api+"b2_delete_key", b2req, nil, headers, nil)
}
// ListKeys wraps b2_list_keys.
func (b *B2) ListKeys(ctx context.Context, max int, next string) ([]*Key, string, error) {
b2req := &b2types.ListKeysRequest{
AccountID: b.accountID,
Max: max,
Next: next,
}
headers := map[string]string{
"Authorization": b.authToken,
}
b2resp := &b2types.ListKeysResponse{}
if err := b.opts.makeRequest(ctx, "b2_create_key", "POST", b.apiURI+b2types.V1api+"b2_create_key", b2req, b2resp, headers, nil); err != nil {
return nil, "", err
}
var keys []*Key
for _, key := range b2resp.Keys {
keys = append(keys, &Key{
Name: key.Name,
ID: key.ID,
Capabilities: key.Capabilities,
Expires: millitime(key.Expires),
b2: b,
})
}
return keys, b2resp.Next, nil
}

View file

@ -265,7 +265,7 @@ func TestStorage(t *testing.T) {
}
// b2_get_download_authorization
if _, err := bucket.GetDownloadAuthorization(ctx, "foo/", 24*time.Hour); err != nil {
if _, err := bucket.GetDownloadAuthorization(ctx, "foo/", 24*time.Hour, "attachment"); err != nil {
t.Errorf("failed to get download auth token: %v", err)
}
}
@ -280,7 +280,7 @@ func TestUploadAuthAfterConnectionHang(t *testing.T) {
hung := make(chan struct{})
// An http.RoundTripper that dies after sending ~10k bytes.
// An http.RoundTripper that dies and hangs after sending ~10k bytes.
hang := func() {
close(hung)
select {}
@ -317,7 +317,6 @@ func TestUploadAuthAfterConnectionHang(t *testing.T) {
go func() {
ue.UploadFile(ctx, buf, buf.Len(), smallFileName, "application/octet-stream", smallSHA1, nil)
t.Fatal("this ought not to be reachable")
}()
<-hung

98
vendor/github.com/kurin/blazer/bin/b2keys/b2keys.go generated vendored Normal file
View file

@ -0,0 +1,98 @@
// b2keys is a small utility for managing Backblaze B2 keys.
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/google/subcommands"
"github.com/kurin/blazer/b2"
)
const (
apiID = "B2_ACCOUNT_ID"
apiKey = "B2_SECRET_KEY"
)
func main() {
subcommands.Register(&create{}, "")
flag.Parse()
ctx := context.Background()
os.Exit(int(subcommands.Execute(ctx)))
}
type create struct {
d *time.Duration
bucket *string
pfx *string
}
func (c *create) Name() string { return "create" }
func (c *create) Synopsis() string { return "create a new application key" }
func (c *create) Usage() string {
return "b2keys create [-bucket bucket] [-duration duration] [-prefix pfx] name capability [capability ...]"
}
func (c *create) SetFlags(fs *flag.FlagSet) {
c.d = fs.Duration("duration", 0, "the lifetime of the new key")
c.bucket = fs.String("bucket", "", "limit the key to the given bucket")
c.pfx = fs.String("prefix", "", "limit the key to the objects starting with prefix")
}
func (c *create) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
if id == "" || key == "" {
fmt.Fprintf(os.Stderr, "both %s and %s must be set in the environment", apiID, apiKey)
return subcommands.ExitUsageError
}
args := f.Args()
if len(args) < 2 {
fmt.Fprintf(os.Stderr, "%s\n", c.Usage())
return subcommands.ExitUsageError
}
name := args[0]
caps := args[1:]
var opts []b2.KeyOption
if *c.d > 0 {
opts = append(opts, b2.Lifetime(*c.d))
}
if *c.pfx != "" {
opts = append(opts, b2.Prefix(*c.pfx))
}
for _, c := range caps {
opts = append(opts, b2.Capability(c))
}
client, err := b2.NewClient(ctx, id, key, b2.UserAgent("b2keys"))
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return subcommands.ExitFailure
}
var cr creater = client
if *c.bucket != "" {
bucket, err := client.Bucket(ctx, *c.bucket)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return subcommands.ExitFailure
}
cr = bucket
}
if _, err := cr.CreateKey(ctx, name, opts...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
type creater interface {
CreateKey(context.Context, string, ...b2.KeyOption) (*b2.Key, error)
}

43
vendor/github.com/kurin/blazer/bin/bonfire/bonfire.go generated vendored Normal file
View file

@ -0,0 +1,43 @@
package main
import (
"context"
"fmt"
"net/http"
"github.com/kurin/blazer/bonfire"
"github.com/kurin/blazer/internal/pyre"
)
type superManager struct {
*bonfire.LocalBucket
bonfire.FS
}
func main() {
ctx := context.Background()
mux := http.NewServeMux()
fs := bonfire.FS("/tmp/b2")
bm := &bonfire.LocalBucket{Port: 8822}
if err := pyre.RegisterServerOnMux(ctx, &pyre.Server{
Account: bonfire.Localhost(8822),
LargeFile: fs,
Bucket: bm,
}, mux); err != nil {
fmt.Println(err)
return
}
sm := superManager{
LocalBucket: bm,
FS: fs,
}
pyre.RegisterLargeFileManagerOnMux(fs, mux)
pyre.RegisterSimpleFileManagerOnMux(fs, mux)
pyre.RegisterDownloadManagerOnMux(sm, mux)
fmt.Println("ok")
fmt.Println(http.ListenAndServe("localhost:8822", mux))
}

257
vendor/github.com/kurin/blazer/bonfire/bonfire.go generated vendored Normal file
View file

@ -0,0 +1,257 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bonfire implements the B2 service.
package bonfire
import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"github.com/kurin/blazer/internal/pyre"
)
type FS string
func (f FS) open(fp string) (io.WriteCloser, error) {
if err := os.MkdirAll(filepath.Dir(fp), 0755); err != nil {
return nil, err
}
return os.Create(fp)
}
func (f FS) PartWriter(id string, part int) (io.WriteCloser, error) {
fp := filepath.Join(string(f), id, fmt.Sprintf("%d", part))
return f.open(fp)
}
func (f FS) Writer(bucket, name, id string) (io.WriteCloser, error) {
fp := filepath.Join(string(f), bucket, name, id)
return f.open(fp)
}
func (f FS) Parts(id string) ([]string, error) {
dir := filepath.Join(string(f), id)
file, err := os.Open(dir)
if err != nil {
return nil, err
}
defer file.Close()
fs, err := file.Readdir(0)
if err != nil {
return nil, err
}
shas := make([]string, len(fs)-1)
for _, fi := range fs {
if fi.Name() == "info" {
continue
}
i, err := strconv.ParseInt(fi.Name(), 10, 32)
if err != nil {
return nil, err
}
p, err := os.Open(filepath.Join(dir, fi.Name()))
if err != nil {
return nil, err
}
sha := sha1.New()
if _, err := io.Copy(sha, p); err != nil {
p.Close()
return nil, err
}
p.Close()
shas[int(i)-1] = fmt.Sprintf("%x", sha.Sum(nil))
}
return shas, nil
}
type fi struct {
Name string
Bucket string
}
func (f FS) Start(bucketId, fileName, fileId string, bs []byte) error {
w, err := f.open(filepath.Join(string(f), fileId, "info"))
if err != nil {
return err
}
if err := json.NewEncoder(w).Encode(fi{Name: fileName, Bucket: bucketId}); err != nil {
w.Close()
return err
}
return w.Close()
}
func (f FS) Finish(fileId string) error {
r, err := os.Open(filepath.Join(string(f), fileId, "info"))
if err != nil {
return err
}
defer r.Close()
var info fi
if err := json.NewDecoder(r).Decode(&info); err != nil {
return err
}
shas, err := f.Parts(fileId) // oh my god this is terrible
if err != nil {
return err
}
w, err := f.open(filepath.Join(string(f), info.Bucket, info.Name, fileId))
if err != nil {
return err
}
for i := 1; i <= len(shas); i++ {
r, err := os.Open(filepath.Join(string(f), fileId, fmt.Sprintf("%d", i)))
if err != nil {
w.Close()
return err
}
if _, err := io.Copy(w, r); err != nil {
w.Close()
r.Close()
return err
}
r.Close()
}
if err := w.Close(); err != nil {
return err
}
return os.RemoveAll(filepath.Join(string(f), fileId))
}
func (f FS) ObjectByName(bucket, name string) (pyre.DownloadableObject, error) {
dir := filepath.Join(string(f), bucket, name)
d, err := os.Open(dir)
if err != nil {
return nil, err
}
defer d.Close()
fis, err := d.Readdir(0)
if err != nil {
return nil, err
}
sort.Slice(fis, func(i, j int) bool { return fis[i].ModTime().Before(fis[j].ModTime()) })
o, err := os.Open(filepath.Join(dir, fis[0].Name()))
if err != nil {
return nil, err
}
return do{
o: o,
size: fis[0].Size(),
}, nil
}
type do struct {
size int64
o *os.File
}
func (d do) Size() int64 { return d.size }
func (d do) Reader() io.ReaderAt { return d.o }
func (d do) Close() error { return d.o.Close() }
func (f FS) Get(fileId string) ([]byte, error) { return nil, nil }
type Localhost int
func (l Localhost) String() string { return fmt.Sprintf("http://localhost:%d", l) }
func (l Localhost) UploadHost(id string) (string, error) { return l.String(), nil }
func (Localhost) Authorize(string, string) (string, error) { return "ok", nil }
func (Localhost) CheckCreds(string, string) error { return nil }
func (l Localhost) APIRoot(string) string { return l.String() }
func (l Localhost) DownloadRoot(string) string { return l.String() }
func (Localhost) Sizes(string) (int32, int32) { return 1e5, 1 }
func (l Localhost) UploadPartHost(fileId string) (string, error) { return l.String(), nil }
type LocalBucket struct {
Port int
mux sync.Mutex
b map[string][]byte
nti map[string]string
}
func (lb *LocalBucket) AddBucket(id, name string, bs []byte) error {
lb.mux.Lock()
defer lb.mux.Unlock()
if lb.b == nil {
lb.b = make(map[string][]byte)
}
if lb.nti == nil {
lb.nti = make(map[string]string)
}
lb.b[id] = bs
lb.nti[name] = id
return nil
}
func (lb *LocalBucket) RemoveBucket(id string) error {
lb.mux.Lock()
defer lb.mux.Unlock()
if lb.b == nil {
lb.b = make(map[string][]byte)
}
delete(lb.b, id)
return nil
}
func (lb *LocalBucket) UpdateBucket(id string, rev int, bs []byte) error {
return errors.New("no")
}
func (lb *LocalBucket) ListBuckets(acct string) ([][]byte, error) {
lb.mux.Lock()
defer lb.mux.Unlock()
var bss [][]byte
for _, bs := range lb.b {
bss = append(bss, bs)
}
return bss, nil
}
func (lb *LocalBucket) GetBucket(id string) ([]byte, error) {
lb.mux.Lock()
defer lb.mux.Unlock()
bs, ok := lb.b[id]
if !ok {
return nil, errors.New("not found")
}
return bs, nil
}
func (lb *LocalBucket) GetBucketID(name string) (string, error) {
lb.mux.Lock()
defer lb.mux.Unlock()
id, ok := lb.nti[name]
if !ok {
return "", errors.New("not found")
}
return id, nil
}

View file

@ -34,6 +34,9 @@ type AuthorizeAccountResponse struct {
URI string `json:"apiUrl"`
DownloadURI string `json:"downloadUrl"`
MinPartSize int `json:"minimumPartSize"`
PartSize int `json:"recommendedPartSize"`
AbsMinPartSize int `json:"absoluteMinimumPartSize"`
Capabilities []string `json:"capabilities"`
}
type LifecycleRule struct {
@ -75,13 +78,6 @@ type ListBucketsResponse struct {
type UpdateBucketRequest struct {
AccountID string `json:"accountId"`
BucketID string `json:"bucketId"`
// bucketName is a required field according to
// https://www.backblaze.com/b2/docs/b2_update_bucket.html.
//
// However, actually setting it returns 400: unknown field in
// com.backblaze.modules.b2.data.UpdateBucketRequest: bucketName
//
//Name string `json:"bucketName"`
Type string `json:"bucketType,omitempty"`
Info map[string]string `json:"bucketInfo,omitempty"`
LifecycleRules []LifecycleRule `json:"lifecycleRules,omitempty"`
@ -99,11 +95,7 @@ type GetUploadURLResponse struct {
Token string `json:"authorizationToken"`
}
type UploadFileResponse struct {
FileID string `json:"fileId"`
Timestamp int64 `json:"uploadTimestamp"`
Action string `json:"action"`
}
type UploadFileResponse GetFileInfoResponse
type DeleteFileVersionRequest struct {
Name string `json:"fileName"`
@ -206,20 +198,23 @@ type GetFileInfoRequest struct {
}
type GetFileInfoResponse struct {
FileID string `json:"fileId"`
Name string `json:"fileName"`
SHA1 string `json:"contentSha1"`
Size int64 `json:"contentLength"`
ContentType string `json:"contentType"`
Info map[string]string `json:"fileInfo"`
Action string `json:"action"`
Timestamp int64 `json:"uploadTimestamp"`
FileID string `json:"fileId,omitempty"`
Name string `json:"fileName,omitempty"`
AccountID string `json:"accountId,omitempty"`
BucketID string `json:"bucketId,omitempty"`
Size int64 `json:"contentLength,omitempty"`
SHA1 string `json:"contentSha1,omitempty"`
ContentType string `json:"contentType,omitempty"`
Info map[string]string `json:"fileInfo,omitempty"`
Action string `json:"action,omitempty"`
Timestamp int64 `json:"uploadTimestamp,omitempty"`
}
type GetDownloadAuthorizationRequest struct {
BucketID string `json:"bucketId"`
Prefix string `json:"fileNamePrefix"`
Valid int `json:"validDurationInSeconds"`
ContentDisposition string `json:"b2ContentDisposition,omitempty"`
}
type GetDownloadAuthorizationResponse struct {
@ -238,3 +233,42 @@ type ListUnfinishedLargeFilesResponse struct {
Files []GetFileInfoResponse `json:"files"`
Continuation string `json:"nextFileId"`
}
type CreateKeyRequest struct {
AccountID string `json:"accountId"`
Capabilities []string `json:"capabilities"`
Name string `json:"keyName"`
Valid int `json:"validDurationInSeconds,omitempty"`
BucketID string `json:"bucketId,omitempty"`
Prefix string `json:"namePrefix,omitempty"`
}
type Key struct {
ID string `json:"applicationKeyId"`
Secret string `json:"applicationKey"`
AccountID string `json:"accountId"`
Capabilities []string `json:"capabilities"`
Name string `json:"keyName"`
Expires int64 `json:"expirationTimestamp"`
BucketID string `json:"bucketId"`
Prefix string `json:"namePrefix"`
}
type CreateKeyResponse Key
type DeleteKeyRequest struct {
KeyID string `json:"applicationKeyId"`
}
type DeleteKeyResponse Key
type ListKeysRequest struct {
AccountID string `json:"accountId"`
Max int `json:"maxKeyCount,omitempty"`
Next string `json:"startApplicationKeyId,omitempty"`
}
type ListKeysResponse struct {
Keys []Key `json:"keys"`
Next string `json:"nextApplicationKeyId"`
}

325
vendor/github.com/kurin/blazer/internal/pyre/api.go generated vendored Normal file
View file

@ -0,0 +1,325 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pyre
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"os"
"reflect"
"strings"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb "github.com/kurin/blazer/internal/pyre/proto"
)
type apiErr struct {
Status int `json:"status"`
Code string `json:"code"`
Message string `json:"message"`
}
func serveMuxOptions() []runtime.ServeMuxOption {
return []runtime.ServeMuxOption{
runtime.WithMarshalerOption("*", &runtime.JSONPb{}),
runtime.WithProtoErrorHandler(func(ctx context.Context, mux *runtime.ServeMux, m runtime.Marshaler, rw http.ResponseWriter, req *http.Request, err error) {
aErr := apiErr{
Status: 400,
Code: "uh oh",
Message: err.Error(),
}
rw.WriteHeader(aErr.Status)
if err := m.NewEncoder(rw).Encode(aErr); err != nil {
fmt.Fprintln(os.Stdout, err)
}
}),
}
}
func getAuth(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", errors.New("no metadata")
}
data := md.Get("authorization")
if len(data) == 0 {
return "", nil
}
return data[0], nil
}
func RegisterServerOnMux(ctx context.Context, srv *Server, mux *http.ServeMux) error {
rmux := runtime.NewServeMux(serveMuxOptions()...)
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return err
}
gsrv := grpc.NewServer()
if err := pb.RegisterPyreServiceHandlerFromEndpoint(ctx, rmux, l.Addr().String(), []grpc.DialOption{grpc.WithInsecure()}); err != nil {
return err
}
pb.RegisterPyreServiceServer(gsrv, srv)
mux.Handle("/b2api/v1/", rmux)
go gsrv.Serve(l)
go func() {
<-ctx.Done()
gsrv.GracefulStop()
}()
return nil
}
type AccountManager interface {
Authorize(acct, key string) (string, error)
CheckCreds(token, api string) error
APIRoot(acct string) string
DownloadRoot(acct string) string
UploadPartHost(fileID string) (string, error)
UploadHost(id string) (string, error)
Sizes(acct string) (recommended, minimum int32)
}
type BucketManager interface {
AddBucket(id, name string, bs []byte) error
RemoveBucket(id string) error
UpdateBucket(id string, rev int, bs []byte) error
ListBuckets(acct string) ([][]byte, error)
GetBucket(id string) ([]byte, error)
}
type LargeFileOrganizer interface {
Start(bucketID, fileName, fileID string, bs []byte) error
Get(fileID string) ([]byte, error)
Parts(fileID string) ([]string, error)
Finish(fileID string) error
}
type Server struct {
Account AccountManager
Bucket BucketManager
LargeFile LargeFileOrganizer
List ListManager
}
func (s *Server) AuthorizeAccount(ctx context.Context, req *pb.AuthorizeAccountRequest) (*pb.AuthorizeAccountResponse, error) {
auth, err := getAuth(ctx)
if err != nil {
return nil, err
}
if !strings.HasPrefix(auth, "Basic ") {
return nil, errors.New("basic auth required")
}
auth = strings.TrimPrefix(auth, "Basic ")
bs, err := base64.StdEncoding.DecodeString(auth)
if err != nil {
return nil, err
}
split := strings.Split(string(bs), ":")
if len(split) != 2 {
return nil, errors.New("bad auth")
}
acct, key := split[0], split[1]
token, err := s.Account.Authorize(acct, key)
if err != nil {
return nil, err
}
rec, min := s.Account.Sizes(acct)
return &pb.AuthorizeAccountResponse{
AuthorizationToken: token,
ApiUrl: s.Account.APIRoot(acct),
DownloadUrl: s.Account.DownloadRoot(acct),
RecommendedPartSize: rec,
MinimumPartSize: rec,
AbsoluteMinimumPartSize: min,
}, nil
}
func (s *Server) ListBuckets(ctx context.Context, req *pb.ListBucketsRequest) (*pb.ListBucketsResponse, error) {
resp := &pb.ListBucketsResponse{}
buckets, err := s.Bucket.ListBuckets(req.AccountId)
if err != nil {
return nil, err
}
for _, bs := range buckets {
var bucket pb.Bucket
if err := proto.Unmarshal(bs, &bucket); err != nil {
return nil, err
}
resp.Buckets = append(resp.Buckets, &bucket)
}
return resp, nil
}
func (s *Server) CreateBucket(ctx context.Context, req *pb.Bucket) (*pb.Bucket, error) {
req.BucketId = uuid.New().String()
bs, err := proto.Marshal(req)
if err != nil {
return nil, err
}
if err := s.Bucket.AddBucket(req.BucketId, req.BucketName, bs); err != nil {
return nil, err
}
return req, nil
}
func (s *Server) DeleteBucket(ctx context.Context, req *pb.Bucket) (*pb.Bucket, error) {
bs, err := s.Bucket.GetBucket(req.BucketId)
if err != nil {
return nil, err
}
var bucket pb.Bucket
if err := proto.Unmarshal(bs, &bucket); err != nil {
return nil, err
}
if err := s.Bucket.RemoveBucket(req.BucketId); err != nil {
return nil, err
}
return &bucket, nil
}
func (s *Server) GetUploadUrl(ctx context.Context, req *pb.GetUploadUrlRequest) (*pb.GetUploadUrlResponse, error) {
host, err := s.Account.UploadHost(req.BucketId)
if err != nil {
return nil, err
}
return &pb.GetUploadUrlResponse{
UploadUrl: fmt.Sprintf("%s/b2api/v1/b2_upload_file/%s", host, req.BucketId),
BucketId: req.BucketId,
}, nil
}
func (s *Server) StartLargeFile(ctx context.Context, req *pb.StartLargeFileRequest) (*pb.StartLargeFileResponse, error) {
fileID := uuid.New().String()
resp := &pb.StartLargeFileResponse{
FileId: fileID,
FileName: req.FileName,
BucketId: req.BucketId,
ContentType: req.ContentType,
FileInfo: req.FileInfo,
}
bs, err := proto.Marshal(resp)
if err != nil {
return nil, err
}
if err := s.LargeFile.Start(req.BucketId, req.FileName, fileID, bs); err != nil {
return nil, err
}
return resp, nil
}
func (s *Server) GetUploadPartUrl(ctx context.Context, req *pb.GetUploadPartUrlRequest) (*pb.GetUploadPartUrlResponse, error) {
host, err := s.Account.UploadPartHost(req.FileId)
if err != nil {
return nil, err
}
return &pb.GetUploadPartUrlResponse{
UploadUrl: fmt.Sprintf("%s/b2api/v1/b2_upload_part/%s", host, req.FileId),
}, nil
}
func (s *Server) FinishLargeFile(ctx context.Context, req *pb.FinishLargeFileRequest) (*pb.FinishLargeFileResponse, error) {
parts, err := s.LargeFile.Parts(req.FileId)
if err != nil {
return nil, err
}
if !reflect.DeepEqual(parts, req.PartSha1Array) {
return nil, errors.New("sha1 array mismatch")
}
if err := s.LargeFile.Finish(req.FileId); err != nil {
return nil, err
}
return &pb.FinishLargeFileResponse{}, nil
}
func (s *Server) ListFileVersions(ctx context.Context, req *pb.ListFileVersionsRequest) (*pb.ListFileVersionsResponse, error) {
return nil, nil
}
//type objTuple struct {
// name, version string
//}
type ListManager interface {
// NextN returns the next n objects, sorted by lexicographical order by name,
// beginning at and including, if it exists, fileName. If withPrefix is not
// empty, it only returns names that begin with that prefix. If skipPrefix
// is not empty, then the no files with that prefix are returned. If the two
// conflict, skipPrefix wins (i.e., do not return the entry).
//
// If fewer than n entries are returned, this signifies that no more names
// exist that meet these criteria.
NextN(bucketID, fileName, withPrefix, skipPrefix string, n int) ([]VersionedObject, error)
}
type VersionedObject interface {
Name() string
NextNVersions(begin string, n int) ([]string, error)
}
//func getNextObjects(lm ListManager, bucket, name, prefix, delimiter string, n int) ([]VersionedObject, error) {
// if delimiter == "" {
// return lm.NextN(bucket, name, prefix, "", n)
// }
// afterPfx := strings.TrimPrefix(name, prefix)
// i := strings.Index(afterPfx, delimiter)
// if i == 0 {
//
// }
// if i < 0 {
// return lm.NextN(bucket, name, prefix, "", n)
// }
// skipPfx := name[:len(prefix)+i]
// // TO
//}
//
//func listFileVersions(lm ListManager, bucket, name, version, prefix, delimiter string, n int) ([]objTuple, error) {
// var tups []objTuple
// var got int
// for {
// objs, err := getNextObjects(bucket, name, prefix, delimiter, n-got)
// if err != nil {
// return nil, err
// }
// if len(objs) == 0 {
// break
// }
// for _, o := range objs {
// var begin string
// if len(tups) == 0 {
// begin = version
// }
// vers, err := lm.NextNVersions(begin, n-got)
// if err != nil {
// return nil, err
// }
// got += len(vers)
// for _, ver := range vers {
// tups = append(tups, objTuple{name: o.Name(), version: ver})
// }
// if got >= n {
// return tups[:n], nil
// }
// }
// }
// return tups, nil
//}

View file

@ -0,0 +1,136 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pyre
import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
)
type DownloadableObject interface {
Size() int64
Reader() io.ReaderAt
io.Closer
}
type DownloadManager interface {
ObjectByName(bucketID, name string) (DownloadableObject, error)
GetBucketID(bucket string) (string, error)
GetBucket(id string) ([]byte, error)
}
type downloadServer struct {
dm DownloadManager
}
type downloadRequest struct {
off, n int64
}
func parseDownloadHeaders(r *http.Request) (*downloadRequest, error) {
rang := r.Header.Get("Range")
if rang == "" {
return &downloadRequest{}, nil
}
if !strings.HasPrefix(rang, "bytes=") {
return nil, fmt.Errorf("unknown range format: %q", rang)
}
rang = strings.TrimPrefix(rang, "bytes=")
if !strings.Contains(rang, "-") {
return nil, fmt.Errorf("unknown range format: %q", rang)
}
parts := strings.Split(rang, "-")
off, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return nil, err
}
end, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, err
}
return &downloadRequest{
off: off,
n: (end + 1) - off,
}, nil
}
func (fs *downloadServer) serveWholeObject(rw http.ResponseWriter, obj DownloadableObject) {
rw.Header().Set("Content-Length", fmt.Sprintf("%d", obj.Size()))
sr := io.NewSectionReader(obj.Reader(), 0, obj.Size())
if _, err := io.Copy(rw, sr); err != nil {
http.Error(rw, err.Error(), 503)
fmt.Println("no reader", err)
}
}
func (fs *downloadServer) servePartialObject(rw http.ResponseWriter, obj DownloadableObject, off, len int64) {
if off >= obj.Size() {
http.Error(rw, "hell naw", 416)
fmt.Printf("range not good (%d-%d for %d)\n", off, len, obj.Size())
return
}
if off+len > obj.Size() {
len = obj.Size() - off
}
sr := io.NewSectionReader(obj.Reader(), off, len)
rw.Header().Set("Content-Length", fmt.Sprintf("%d", len))
rw.WriteHeader(206) // this goes after headers are set
if _, err := io.Copy(rw, sr); err != nil {
fmt.Println("bad read:", err)
}
}
func (fs *downloadServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
req, err := parseDownloadHeaders(r)
if err != nil {
http.Error(rw, err.Error(), 503)
fmt.Println("weird header")
return
}
path := strings.TrimPrefix(r.URL.Path, "/")
parts := strings.Split(path, "/")
if len(parts) < 3 {
http.Error(rw, err.Error(), 404)
fmt.Println("weird file")
return
}
bucket := parts[1]
bid, err := fs.dm.GetBucketID(bucket)
if err != nil {
http.Error(rw, err.Error(), 503)
fmt.Println("no bucket:", err)
return
}
file := strings.Join(parts[2:], "/")
obj, err := fs.dm.ObjectByName(bid, file)
if err != nil {
http.Error(rw, err.Error(), 503)
fmt.Println("no reader", err)
return
}
defer obj.Close()
if req.off == 0 && req.n == 0 {
fs.serveWholeObject(rw, obj)
return
}
fs.servePartialObject(rw, obj, req.off, req.n)
}
func RegisterDownloadManagerOnMux(d DownloadManager, mux *http.ServeMux) {
mux.Handle("/file/", &downloadServer{dm: d})
}

91
vendor/github.com/kurin/blazer/internal/pyre/large.go generated vendored Normal file
View file

@ -0,0 +1,91 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pyre
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
)
const uploadFilePartPrefix = "/b2api/v1/b2_upload_part/"
type LargeFileManager interface {
PartWriter(id string, part int) (io.WriteCloser, error)
}
type largeFileServer struct {
fm LargeFileManager
}
type uploadPartRequest struct {
ID string `json:"fileId"`
Part int `json:"partNumber"`
Size int64 `json:"contentLength"`
Hash string `json:"contentSha1"`
}
func parseUploadPartHeaders(r *http.Request) (uploadPartRequest, error) {
var ur uploadPartRequest
ur.Hash = r.Header.Get("X-Bz-Content-Sha1")
part, err := strconv.ParseInt(r.Header.Get("X-Bz-Part-Number"), 10, 64)
if err != nil {
return ur, err
}
ur.Part = int(part)
size, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
return ur, err
}
ur.Size = size
ur.ID = strings.TrimPrefix(r.URL.Path, uploadFilePartPrefix)
return ur, nil
}
func (fs *largeFileServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
req, err := parseUploadPartHeaders(r)
if err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
w, err := fs.fm.PartWriter(req.ID, req.Part)
if err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
if _, err := io.Copy(w, io.LimitReader(r.Body, req.Size)); err != nil {
w.Close()
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
if err := w.Close(); err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
if err := json.NewEncoder(rw).Encode(req); err != nil {
fmt.Println("oh no")
}
}
func RegisterLargeFileManagerOnMux(f LargeFileManager, mux *http.ServeMux) {
mux.Handle(uploadFilePartPrefix, &largeFileServer{fm: f})
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,484 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: proto/pyre.proto
/*
Package pyre_proto is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pyre_proto
import (
"io"
"net/http"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/grpc-ecosystem/grpc-gateway/utilities"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
func request_PyreService_AuthorizeAccount_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq AuthorizeAccountRequest
var metadata runtime.ServerMetadata
msg, err := client.AuthorizeAccount(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_ListBuckets_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ListBucketsRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.ListBuckets(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_CreateBucket_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq Bucket
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.CreateBucket(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_DeleteBucket_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq Bucket
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.DeleteBucket(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_GetUploadUrl_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetUploadUrlRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetUploadUrl(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_StartLargeFile_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StartLargeFileRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.StartLargeFile(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_GetUploadPartUrl_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetUploadPartUrlRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.GetUploadPartUrl(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_FinishLargeFile_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq FinishLargeFileRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.FinishLargeFile(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_PyreService_ListFileVersions_0(ctx context.Context, marshaler runtime.Marshaler, client PyreServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ListFileVersionsRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.ListFileVersions(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
// RegisterPyreServiceHandlerFromEndpoint is same as RegisterPyreServiceHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterPyreServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterPyreServiceHandler(ctx, mux, conn)
}
// RegisterPyreServiceHandler registers the http handlers for service PyreService to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterPyreServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterPyreServiceHandlerClient(ctx, mux, NewPyreServiceClient(conn))
}
// RegisterPyreServiceHandlerClient registers the http handlers for service PyreService
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "PyreServiceClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "PyreServiceClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "PyreServiceClient" to call the correct interceptors.
func RegisterPyreServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client PyreServiceClient) error {
mux.Handle("GET", pattern_PyreService_AuthorizeAccount_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_AuthorizeAccount_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_AuthorizeAccount_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_ListBuckets_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_ListBuckets_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_ListBuckets_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_CreateBucket_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_CreateBucket_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_CreateBucket_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_DeleteBucket_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_DeleteBucket_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_DeleteBucket_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_GetUploadUrl_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_GetUploadUrl_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_GetUploadUrl_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_StartLargeFile_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_StartLargeFile_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_StartLargeFile_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_GetUploadPartUrl_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_GetUploadPartUrl_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_GetUploadPartUrl_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_FinishLargeFile_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_FinishLargeFile_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_FinishLargeFile_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_PyreService_ListFileVersions_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_PyreService_ListFileVersions_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_PyreService_ListFileVersions_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_PyreService_AuthorizeAccount_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_authorize_account"}, ""))
pattern_PyreService_ListBuckets_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_list_buckets"}, ""))
pattern_PyreService_CreateBucket_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_create_bucket"}, ""))
pattern_PyreService_DeleteBucket_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_delete_bucket"}, ""))
pattern_PyreService_GetUploadUrl_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_get_upload_url"}, ""))
pattern_PyreService_StartLargeFile_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_start_large_file"}, ""))
pattern_PyreService_GetUploadPartUrl_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_get_upload_part_url"}, ""))
pattern_PyreService_FinishLargeFile_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_finish_large_file"}, ""))
pattern_PyreService_ListFileVersions_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"b2api", "v1", "b2_list_file_versions"}, ""))
)
var (
forward_PyreService_AuthorizeAccount_0 = runtime.ForwardResponseMessage
forward_PyreService_ListBuckets_0 = runtime.ForwardResponseMessage
forward_PyreService_CreateBucket_0 = runtime.ForwardResponseMessage
forward_PyreService_DeleteBucket_0 = runtime.ForwardResponseMessage
forward_PyreService_GetUploadUrl_0 = runtime.ForwardResponseMessage
forward_PyreService_StartLargeFile_0 = runtime.ForwardResponseMessage
forward_PyreService_GetUploadPartUrl_0 = runtime.ForwardResponseMessage
forward_PyreService_FinishLargeFile_0 = runtime.ForwardResponseMessage
forward_PyreService_ListFileVersions_0 = runtime.ForwardResponseMessage
)

View file

@ -0,0 +1,335 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
import "google/api/annotations.proto";
package pyre.proto;
message AuthorizeAccountRequest {}
message AuthorizeAccountResponse {
// The identifier for the account.
string account_id = 1;
// An authorization token to use with all calls, other than
// b2_authorize_account, that need an Authorization header. This
// authorization token is valid for at most 24 hours.
string authorization_token = 2;
// The base URL to use for all API calls except for uploading and downloading
// files.
string api_url = 3;
// The base URL to use for downloading files.
string download_url = 4;
// The recommended size for each part of a large file. We recommend using
// this part size for optimal upload performance.
int32 recommended_part_size = 5;
// The smallest possible size of a part of a large file (except the last
// one). This is smaller than the recommended part size. If you use it, you
// may find that it takes longer overall to upload a large file.
int32 absolute_minimum_part_size = 6;
int32 minimum_part_size = 7; // alias for recommended_part_size
}
message ListBucketsRequest {
// The ID of your account.
string account_id = 1;
// When specified, the result will be a list containing just this bucket, if
// it's present in the account, or no buckets if the account does not have a
// bucket with this ID.
string bucket_id = 2;
// When specified, the result will be a list containing just this bucket, if
// it's present in the account, or no buckets if the account does not have a
// bucket with this ID.
string bucket_name = 3;
// If present, B2 will use it as a filter for bucket types returned in the
// list buckets response. If not present, only buckets with bucket types
// "allPublic", "allPrivate" and "snapshot" will be returned. A special
// filter value of ["all"] will return all bucket types.
//
// If present, it must be in the form of a json array of strings containing
// valid bucket types in quotes and separated by a comma. Valid bucket types
// include "allPrivate", "allPublic", "snapshot", and other values added in
// the future.
//
// A bad request error will be returned if "all" is used with other bucket
// types, this field is empty, or invalid bucket types are requested.
repeated string bucket_types = 4;
}
message LifecycleRule {
// After a file is uploaded, the number of days before it can be hidden.
int32 days_from_uploading_to_hiding = 1;
// After a file is hidden, the number of days before it can be deleted.
int32 days_from_hiding_to_deleting = 2;
// The rule applies to files whose names start with this prefix.
string file_name_prefix = 3;
}
message CorsRule {
// A name for humans to recognize the rule in a user interface. Names must be
// unique within a bucket. Names can consist of upper-case and lower-case
// English letters, numbers, and "-". No other characters are allowed. A name
// must be at least 6 characters long, and can be at most 50 characters long.
// These are all allowed names: myPhotosSite, allowAnyHttps,
// backblaze-images. Names that start with "b2-" are reserved for Backblaze
// use.
string cors_rule_name = 1;
// A non-empty list specifying which origins the rule covers. Each value may
// have one of many formats:
//
// * The origin can be fully specified, such as http://www.example.com:8180
// or https://www.example.com:4433.
//
// * The origin can omit a default port, such as https://www.example.com.
//
// * The origin may have a single '*' as part of the domain name, such as
// https://*.example.com, https://*:8443 or https://*.
//
// * The origin may be 'https' to match any origin that uses HTTPS. (This is
// broader than 'https://*' because it matches any port.)
//
// * Finally, the origin can be a single '*' to match any origin.
//
// If any entry is "*", it must be the only entry. There can be at most one
// "https" entry and no entry after it may start with "https:".
repeated string allowed_origins = 2;
// A list specifying which operations the rule allows. At least one value
// must be specified. All values must be from the following list. More values
// may be added to this list at any time.
//
// b2_download_file_by_name
// b2_download_file_by_id
// b2_upload_file
// b2_upload_part
repeated string allowed_operations = 3;
// If present, this is a list of headers that are allowed in a pre-flight
// OPTIONS's request's Access-Control-Request-Headers header value. Each
// value may have one of many formats:
//
// * It may be a complete header name, such as x-bz-content-sha1.
//
// * It may end with an asterisk, such as x-bz-info-*.
//
// * Finally, it may be a single '*' to match any header.
//
// If any entry is "*", it must be the only entry in the list. If this list
// is missing, it is be treated as if it is a list with no entries.
repeated string allowed_headers = 4;
// If present, this is a list of headers that may be exposed to an
// application inside the client (eg. exposed to Javascript in a browser).
// Each entry in the list must be a complete header name (eg.
// "x-bz-content-sha1"). If this list is missing or empty, no headers will be
// exposed.
repeated string expose_headers = 5;
// This specifies the maximum number of seconds that a browser may cache the
// response to a preflight request. The value must not be negative and it
// must not be more than 86,400 seconds (one day).
int32 max_age_seconds = 6;
}
message Bucket {
string account_id = 1;
string bucket_id = 2;
string bucket_name = 3;
string bucket_type = 4;
map<string, string> bucket_info = 5;
repeated CorsRule cores_rules = 6;
repeated LifecycleRule lifecycle_rules = 7;
int32 revision = 8;
}
message ListBucketsResponse {
repeated Bucket buckets = 1;
}
message GetUploadUrlRequest {
string bucket_id = 1;
}
message GetUploadUrlResponse {
string bucket_id = 1;
string upload_url = 2;
string authorization_token = 3;
}
message UploadFileResponse {
string file_id = 1;
string file_name = 2;
string account_id = 3;
string bucket_id = 4;
int32 content_length = 5;
string content_sha1 = 6;
string content_type = 7;
map<string, string> file_info = 8;
string action = 9;
int64 upload_timestamp = 10;
}
message StartLargeFileRequest {
string bucket_id = 1;
string file_name = 2;
string content_type = 3;
map<string, string> file_info = 4;
}
message StartLargeFileResponse {
string file_id = 1;
string file_name = 2;
string account_id = 3;
string bucket_id = 4;
string content_type = 5;
map<string, string> file_info = 6;
int64 upload_timestamp = 7;
}
message GetUploadPartUrlRequest {
string file_id = 1;
}
message GetUploadPartUrlResponse {
string file_id = 1;
string upload_url = 2;
string authorization_token = 3;
}
message FinishLargeFileRequest {
string file_id = 1;
repeated string part_sha1_array = 2;
// string sha1 = 3;
}
message FinishLargeFileResponse {
string file_id = 1;
string file_name = 2;
string account_id = 3;
string bucket_id = 4;
int64 content_length = 5;
string content_sha1 = 6; // always "none"
string content_type = 7;
map<string, string> file_info = 8;
string action = 9;
int64 upload_timestamp = 10;
}
message ListFileVersionsRequest {
string bucket_id = 1;
string start_file_name = 2;
string start_file_id = 3;
int32 max_file_count = 4;
string prefix = 5;
string delimiter = 6;
}
message ListFileVersionsResponse {
repeated File files = 1;
string next_file_name = 2;
string next_file_id = 3;
}
message File {
string file_id = 1;
string file_name = 2;
int64 content_length = 3;
string content_type = 4;
string content_sha1 = 5;
map<string, string> file_info = 6;
string action = 7;
int64 size = 8;
int64 upload_timestamp = 9;
}
service PyreService {
// Used to log in to the B2 API. Returns an authorization token that can be
// used for account-level operations, and a URL that should be used as the
// base URL for subsequent API calls.
rpc AuthorizeAccount(AuthorizeAccountRequest) returns (AuthorizeAccountResponse) {
option (google.api.http) = {
get: "/b2api/v1/b2_authorize_account"
};
}
// Lists buckets associated with an account, in alphabetical order by bucket
// name.
rpc ListBuckets(ListBucketsRequest) returns (ListBucketsResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_list_buckets"
body: "*"
};
}
// Creates a new bucket. A bucket belongs to the account used to create it.
//
// Buckets can be named. The name must be globally unique. No account can use
// a bucket with the same name. Buckets are assigned a unique bucketId which
// is used when uploading, downloading, or deleting files.
//
// There is a limit of 100 buckets per account.
rpc CreateBucket(Bucket) returns (Bucket) {
option (google.api.http) = {
post: "/b2api/v1/b2_create_bucket"
body: "*"
};
}
// Deletes the bucket specified. Only buckets that contain no version of any
// files can be deleted.
rpc DeleteBucket(Bucket) returns (Bucket) {
option (google.api.http) = {
post: "/b2api/v1/b2_delete_bucket"
body: "*"
};
}
rpc GetUploadUrl(GetUploadUrlRequest) returns (GetUploadUrlResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_get_upload_url"
body: "*"
};
}
// Prepares for uploading the parts of a large file.
rpc StartLargeFile(StartLargeFileRequest) returns (StartLargeFileResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_start_large_file"
body: "*"
};
}
// Gets an URL to use for uploading parts of a large file.
rpc GetUploadPartUrl(GetUploadPartUrlRequest) returns (GetUploadPartUrlResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_get_upload_part_url"
body: "*"
};
}
// Converts the parts that have been uploaded into a single B2 file.
rpc FinishLargeFile(FinishLargeFileRequest) returns (FinishLargeFileResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_finish_large_file"
body: "*"
};
}
// Lists all of the versions of all of the files contained in one bucket, in
// alphabetical order by file name, and by reverse of date/time uploaded for
// versions of files with the same name.
rpc ListFileVersions(ListFileVersionsRequest) returns (ListFileVersionsResponse) {
option (google.api.http) = {
post: "/b2api/v1/b2_list_file_versions"
body: "*"
};
}
}

20
vendor/github.com/kurin/blazer/internal/pyre/pyre.go generated vendored Normal file
View file

@ -0,0 +1,20 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pyre provides a gRPC-based implementation of B2, as well as a
// RESTful gateway on top of it.
package pyre
//go:generate protoc -I/usr/local/include -I. -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --grpc-gateway_out=logtostderr=true:. proto/pyre.proto
//go:generate protoc -I/usr/local/include -I. -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:. proto/pyre.proto

109
vendor/github.com/kurin/blazer/internal/pyre/simple.go generated vendored Normal file
View file

@ -0,0 +1,109 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pyre
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/google/uuid"
"github.com/kurin/blazer/internal/b2types"
)
const uploadFilePrefix = "/b2api/v1/b2_upload_file/"
type SimpleFileManager interface {
Writer(bucket, name, id string) (io.WriteCloser, error)
}
type simpleFileServer struct {
fm SimpleFileManager
}
type uploadRequest struct {
name string
contentType string
size int64
sha1 string
bucket string
info map[string]string
}
func parseUploadHeaders(r *http.Request) (*uploadRequest, error) {
ur := &uploadRequest{info: make(map[string]string)}
ur.name = r.Header.Get("X-Bz-File-Name")
ur.contentType = r.Header.Get("Content-Type")
ur.sha1 = r.Header.Get("X-Bz-Content-Sha1")
size, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, err
}
ur.size = size
for k := range r.Header {
if !strings.HasPrefix("X-Bz-Info-", k) {
continue
}
name := strings.TrimPrefix("X-Bz-Info-", k)
ur.info[name] = r.Header.Get(k)
}
ur.bucket = strings.TrimPrefix(r.URL.Path, uploadFilePrefix)
return ur, nil
}
func (fs *simpleFileServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
req, err := parseUploadHeaders(r)
if err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
id := uuid.New().String()
w, err := fs.fm.Writer(req.bucket, req.name, id)
if err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
if _, err := io.Copy(w, io.LimitReader(r.Body, req.size)); err != nil {
w.Close()
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
if err := w.Close(); err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
resp := &b2types.UploadFileResponse{
FileID: id,
Name: req.name,
SHA1: req.sha1,
BucketID: req.bucket,
}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
http.Error(rw, err.Error(), 500)
fmt.Println("oh no")
return
}
}
func RegisterSimpleFileManagerOnMux(f SimpleFileManager, mux *http.ServeMux) {
mux.Handle(uploadFilePrefix, &simpleFileServer{fm: f})
}

View file

@ -30,9 +30,9 @@ func TestOperationLive(t *testing.T) {
wg.Add(1)
i := i
go func() {
var n int
defer wg.Done()
for j := 0; j < 10; j++ {
var n int
if err := g.Operate(ctx, name, func(b []byte) ([]byte, error) {
if len(b) > 0 {
i, err := strconv.Atoi(string(b))