Merge pull request #1815 from restic/update-blazer

Update github.com/kurin/blazer
This commit is contained in:
Alexander Neumann 2018-05-25 20:46:35 +02:00
commit 1892b314f8
10 changed files with 320 additions and 164 deletions

4
Gopkg.lock generated
View file

@ -94,8 +94,8 @@
[[projects]]
name = "github.com/kurin/blazer"
packages = ["b2","base","internal/b2assets","internal/b2types","internal/blog","x/window"]
revision = "b7c9cf27cae3aec98c2caaeb5181608bfe05b17c"
version = "v0.3.1"
revision = "3c18ed98a4120a440c8f45d8fbf41d414612a501"
version = "v0.4.2"
[[projects]]
name = "github.com/marstr/guid"

View file

@ -97,20 +97,11 @@ func downloadFile(ctx context.Context, bucket *b2.Bucket, downloads int, src, ds
```go
func printObjects(ctx context.Context, bucket *b2.Bucket) error {
var cur *b2.Cursor
for {
objs, c, err := bucket.ListObjects(ctx, 1000, cur)
if err != nil && err != io.EOF {
return err
}
for _, obj := range objs {
fmt.Println(obj)
}
if err == io.EOF {
return
}
cur = c
iterator := bucket.List()
for iterator.Next() {
fmt.Println(itrator.Object())
}
return iterator.Err()
}
```

View file

@ -501,7 +501,7 @@ const (
Hider
// Folder is a special state given to non-objects that are returned during a
// List*Objects call with a non-empty Delimiter.
// List call with a ListDelimiter option.
Folder
)
@ -574,6 +574,8 @@ func (o *Object) Delete(ctx context.Context) error {
}
// Cursor is passed to ListObjects to return subsequent pages.
//
// DEPRECATED. Will be removed in a future release.
type Cursor struct {
// Prefix limits the listed objects to those that begin with this string.
Prefix string
@ -602,6 +604,8 @@ type Cursor struct {
//
// ListObjects will return io.EOF when there are no objects left in the bucket,
// however it may do so concurrently with the last objects.
//
// DEPRECATED. Will be removed in a future release.
func (b *Bucket) ListObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}
@ -636,6 +640,8 @@ func (b *Bucket) ListObjects(ctx context.Context, count int, c *Cursor) ([]*Obje
// ListCurrentObjects is similar to ListObjects, except that it returns only
// current, unhidden objects in the bucket.
//
// DEPRECATED. Will be removed in a future release.
func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}
@ -669,6 +675,8 @@ func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) (
// ListUnfinishedLargeFiles lists any objects that correspond to large file uploads that haven't been completed.
// This can happen for example when an upload is interrupted.
//
// DEPRECATED. Will be removed in a future release.
func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}

View file

@ -64,21 +64,14 @@ func TestReadWriteLive(t *testing.T) {
t.Error(err)
}
var cur *Cursor
for {
objs, c, err := bucket.ListObjects(ctx, 100, cur)
if err != nil && err != io.EOF {
t.Fatal(err)
iter := bucket.List(ListHidden())
for iter.Next(ctx) {
if err := iter.Object().Delete(ctx); err != nil {
t.Error(err)
}
for _, o := range objs {
if err := o.Delete(ctx); err != nil {
t.Error(err)
}
}
if err == io.EOF {
break
}
cur = c
}
if err := iter.Err(); err != nil {
t.Error(err)
}
}
@ -175,7 +168,7 @@ func TestHideShowLive(t *testing.T) {
t.Fatal(err)
}
got, err := countObjects(ctx, bucket.ListCurrentObjects)
got, err := countObjects(ctx, bucket.List())
if err != nil {
t.Error(err)
}
@ -193,7 +186,7 @@ func TestHideShowLive(t *testing.T) {
t.Fatal(err)
}
got, err = countObjects(ctx, bucket.ListCurrentObjects)
got, err = countObjects(ctx, bucket.List())
if err != nil {
t.Error(err)
}
@ -207,7 +200,7 @@ func TestHideShowLive(t *testing.T) {
}
// count see the object again
got, err = countObjects(ctx, bucket.ListCurrentObjects)
got, err = countObjects(ctx, bucket.List())
if err != nil {
t.Error(err)
}
@ -542,33 +535,37 @@ func TestListObjectsWithPrefix(t *testing.T) {
t.Fatal(err)
}
// This is kind of a hack, but
type lfun func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)
table := []struct {
opts []ListOption
}{
{
opts: []ListOption{
ListPrefix("baz/"),
},
},
{
opts: []ListOption{
ListPrefix("baz/"),
ListHidden(),
},
},
}
for _, f := range []lfun{bucket.ListObjects, bucket.ListCurrentObjects} {
c := &Cursor{
Prefix: "baz/",
}
for _, entry := range table {
iter := bucket.List(entry.opts...)
var res []string
for {
objs, cur, err := f(ctx, 10, c)
if err != nil && err != io.EOF {
t.Fatalf("bucket.ListObjects: %v", err)
for iter.Next(ctx) {
o := iter.Object()
attrs, err := o.Attrs(ctx)
if err != nil {
t.Errorf("(%v).Attrs: %v", o, err)
continue
}
for _, o := range objs {
attrs, err := o.Attrs(ctx)
if err != nil {
t.Errorf("(%v).Attrs: %v", o, err)
continue
}
res = append(res, attrs.Name)
}
if err == io.EOF {
break
}
c = cur
res = append(res, attrs.Name)
}
if iter.Err() != nil {
t.Errorf("iter.Err(): %v", iter.Err())
}
want := []string{"baz/bar"}
if !reflect.DeepEqual(res, want) {
t.Errorf("got %v, want %v", res, want)
@ -746,19 +743,15 @@ func TestAttrsNoRoundtrip(t *testing.T) {
t.Fatal(err)
}
objs, _, err := bucket.ListObjects(ctx, 1, nil)
if err != nil {
t.Fatal(err)
}
if len(objs) != 1 {
t.Fatalf("unexpected objects: got %d, want 1", len(objs))
}
iter := bucket.List()
iter.Next(ctx)
obj := iter.Object()
var trips int
for range bucket.c.Status().table()["1m"] {
trips += 1
trips++
}
attrs, err := objs[0].Attrs(ctx)
attrs, err := obj.Attrs(ctx)
if err != nil {
t.Fatal(err)
}
@ -768,7 +761,7 @@ func TestAttrsNoRoundtrip(t *testing.T) {
var newTrips int
for range bucket.c.Status().table()["1m"] {
newTrips += 1
newTrips++
}
if trips != newTrips {
t.Errorf("Attrs() should not have caused any net traffic, but it did: old %d, new %d", trips, newTrips)
@ -859,13 +852,9 @@ func TestListUnfinishedLargeFiles(t *testing.T) {
if _, err := io.Copy(w, io.LimitReader(zReader{}, 1e6)); err != nil {
t.Fatal(err)
}
// Don't close the writer.
fs, _, err := bucket.ListUnfinishedLargeFiles(ctx, 10, nil)
if err != io.EOF && err != nil {
t.Fatal(err)
}
if len(fs) != 1 {
t.Errorf("ListUnfinishedLargeFiles: got %d, want 1", len(fs))
iter := bucket.List(ListUnfinished())
if !iter.Next(ctx) {
t.Errorf("ListUnfinishedLargeFiles: got none, want 1 (error %v)", iter.Err())
}
}
@ -905,39 +894,12 @@ type object struct {
err error
}
func countObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) (int, error) {
func countObjects(ctx context.Context, iter *ObjectIterator) (int, error) {
var got int
ch := listObjects(ctx, f)
for c := range ch {
if c.err != nil {
return 0, c.err
}
for iter.Next(ctx) {
got++
}
return got, nil
}
func listObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) <-chan object {
ch := make(chan object)
go func() {
defer close(ch)
var cur *Cursor
for {
objs, c, err := f(ctx, 100, cur)
if err != nil && err != io.EOF {
ch <- object{err: err}
return
}
for _, o := range objs {
ch <- object{o: o}
}
if err == io.EOF {
return
}
cur = c
}
}()
return ch
return got, iter.Err()
}
var defaultTransport = http.DefaultTransport
@ -1042,14 +1004,15 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
}
f := func() {
defer ccport.done()
for c := range listObjects(ctx, bucket.ListObjects) {
if c.err != nil {
continue
}
if err := c.o.Delete(ctx); err != nil {
iter := bucket.List(ListHidden())
for iter.Next(ctx) {
if err := iter.Object().Delete(ctx); err != nil {
t.Error(err)
}
}
if err := iter.Err(); err != nil && !IsNotExist(err) {
t.Errorf("%#v", err)
}
if err := bucket.Delete(ctx); err != nil && !IsNotExist(err) {
t.Error(err)
}

183
vendor/github.com/kurin/blazer/b2/iterator.go generated vendored Normal file
View file

@ -0,0 +1,183 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"context"
"io"
"sync"
)
// List returns an iterator for selecting objects in a bucket. The default
// behavior, with no options, is to list all currently un-hidden objects.
func (b *Bucket) List(opts ...ListOption) *ObjectIterator {
o := &ObjectIterator{
bucket: b,
}
for _, opt := range opts {
opt(&o.opts)
}
return o
}
// ObjectIterator abtracts away the tricky bits of iterating over a bucket's
// contents.
//
// It is intended to be called in a loop:
// for iter.Next(ctx) {
// obj := iter.Object()
// // act on obj
// }
// if err := iter.Err(); err != nil {
// // handle err
// }
type ObjectIterator struct {
bucket *Bucket
final bool
err error
idx int
c *Cursor
opts objectIteratorOptions
objs []*Object
init sync.Once
l lister
count int
}
type lister func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)
func (o *ObjectIterator) frame(ctx context.Context) error {
objs, c, err := o.l(ctx, o.count, o.c)
if err != nil && err != io.EOF {
if bNotExist.MatchString(err.Error()) {
return b2err{
err: err,
notFoundErr: true,
}
}
return err
}
o.c = c
o.objs = objs
o.idx = 0
if err == io.EOF {
o.final = true
}
return nil
}
// Next advances the iterator to the next object. It should be called before
// any calls to Object(). If Next returns true, then the next call to Object()
// will be valid. Once Next returns false, it is important to check the return
// value of Err().
func (o *ObjectIterator) Next(ctx context.Context) bool {
o.init.Do(func() {
o.count = 1000
switch {
case o.opts.unfinished:
o.l = o.bucket.ListUnfinishedLargeFiles
o.count = 100
case o.opts.hidden:
o.l = o.bucket.ListObjects
default:
o.l = o.bucket.ListCurrentObjects
}
o.c = &Cursor{
Prefix: o.opts.prefix,
Delimiter: o.opts.delimiter,
}
})
if o.err != nil {
return false
}
if o.idx >= len(o.objs) {
if o.final {
o.err = io.EOF
return false
}
if err := o.frame(ctx); err != nil {
o.err = err
return false
}
return o.Next(ctx)
}
o.idx++
return true
}
// Object returns the current object.
func (o *ObjectIterator) Object() *Object {
return o.objs[o.idx-1]
}
// Err returns the current error or nil. If Next() returns false and Err() is
// nil, then all objects have been seen.
func (o *ObjectIterator) Err() error {
if o.err == io.EOF {
return nil
}
return o.err
}
type objectIteratorOptions struct {
hidden bool
unfinished bool
prefix string
delimiter string
}
// A ListOption alters the default behavor of List.
type ListOption func(*objectIteratorOptions)
// ListHidden will include hidden objects in the output.
func ListHidden() ListOption {
return func(o *objectIteratorOptions) {
o.hidden = true
}
}
// ListUnfinished will list unfinished large file operations instead of
// existing objects.
func ListUnfinished() ListOption {
return func(o *objectIteratorOptions) {
o.unfinished = true
}
}
// ListPrefix will restrict the output to objects whose names begin with
// prefix.
func ListPrefix(pfx string) ListOption {
return func(o *objectIteratorOptions) {
o.prefix = pfx
}
}
// ListDelimiter denotes the path separator. If set, object listings will be
// truncated at this character.
//
// For example, if the bucket contains objects foo/bar, foo/baz, and foo,
// then a delimiter of "/" will cause the listing to return "foo" and "foo/".
// Otherwise, the listing would have returned all object names.
//
// Note that objects returned that end in the delimiter may not be actual
// objects, e.g. you cannot read from (or write to, or delete) an object
// "foo/", both because no actual object exists and because B2 disallows object
// names that end with "/". If you want to ensure that all objects returned
// are actual objects, leave this unset.
func ListDelimiter(delimiter string) ListOption {
return func(o *objectIteratorOptions) {
o.delimiter = delimiter
}
}

View file

@ -42,7 +42,7 @@ import (
const (
APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.3.1"
DefaultUserAgent = "blazer/0.4.2"
)
type b2err struct {

View file

@ -3,8 +3,8 @@ package main
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/kurin/blazer/b2"
@ -24,12 +24,27 @@ func main() {
fmt.Println(err)
return
}
buckets, err := client.ListBuckets(ctx)
if err != nil {
fmt.Println(err)
return
}
var kill []string
for _, bucket := range buckets {
if strings.HasPrefix(bucket.Name(), fmt.Sprintf("%s-b2-tests-", id)) {
kill = append(kill, bucket.Name())
}
if bucket.Name() == fmt.Sprintf("%s-consistobucket", id) || bucket.Name() == fmt.Sprintf("%s-base-tests", id) {
kill = append(kill, bucket.Name())
}
}
var wg sync.WaitGroup
for _, name := range []string{"consistobucket", "base-tests"} {
for _, name := range kill {
wg.Add(1)
go func(name string) {
defer wg.Done()
if err := killBucket(ctx, client, id, name); err != nil {
fmt.Println("removing", name)
if err := killBucket(ctx, client, name); err != nil {
fmt.Println(err)
}
}(name)
@ -37,8 +52,8 @@ func main() {
wg.Wait()
}
func killBucket(ctx context.Context, client *b2.Client, id, name string) error {
bucket, err := client.NewBucket(ctx, id+"-"+name, nil)
func killBucket(ctx context.Context, client *b2.Client, name string) error {
bucket, err := client.NewBucket(ctx, name, nil)
if b2.IsNotExist(err) {
return nil
}
@ -46,18 +61,11 @@ func killBucket(ctx context.Context, client *b2.Client, id, name string) error {
return err
}
defer bucket.Delete(ctx)
cur := &b2.Cursor{}
for {
os, c, err := bucket.ListObjects(ctx, 1000, cur)
if err != nil && err != io.EOF {
return err
iter := bucket.List(b2.ListHidden())
for iter.Next(ctx) {
if err := iter.Object().Delete(ctx); err != nil {
fmt.Println(err)
}
for _, o := range os {
o.Delete(ctx)
}
if err == io.EOF {
return nil
}
cur = c
}
return iter.Err()
}

View file

@ -2,7 +2,6 @@ package consistent
import (
"context"
"io"
"io/ioutil"
"os"
"strconv"
@ -66,7 +65,7 @@ func TestOperationLive(t *testing.T) {
t.Fatal(err)
}
if n != 100 {
t.Errorf("result: got %d, want 10", n)
t.Errorf("result: got %d, want 100", n)
}
}
@ -142,14 +141,15 @@ func startLiveTest(ctx context.Context, t *testing.T) (*b2.Bucket, func()) {
return nil, nil
}
f := func() {
for c := range listObjects(ctx, bucket.ListObjects) {
if c.err != nil {
continue
}
if err := c.o.Delete(ctx); err != nil {
iter := bucket.List(b2.ListHidden())
for iter.Next(ctx) {
if err := iter.Object().Delete(ctx); err != nil {
t.Error(err)
}
}
if err := iter.Err(); err != nil && !b2.IsNotExist(err) {
t.Error(err)
}
if err := bucket.Delete(ctx); err != nil && !b2.IsNotExist(err) {
t.Error(err)
}
@ -157,29 +157,6 @@ func startLiveTest(ctx context.Context, t *testing.T) (*b2.Bucket, func()) {
return bucket, f
}
func listObjects(ctx context.Context, f func(context.Context, int, *b2.Cursor) ([]*b2.Object, *b2.Cursor, error)) <-chan object {
ch := make(chan object)
go func() {
defer close(ch)
var cur *b2.Cursor
for {
objs, c, err := f(ctx, 100, cur)
if err != nil && err != io.EOF {
ch <- object{err: err}
return
}
for _, o := range objs {
ch <- object{o: o}
}
if err == io.EOF {
return
}
cur = c
}
}()
return ch
}
type object struct {
o *b2.Object
err error

View file

@ -24,7 +24,7 @@ import (
// A Window efficiently records events that have occurred over a span of time
// extending from some fixed interval ago to now. Events that pass beyond this
// horizon effectively "fall off" the back of the window.
// horizon are discarded.
type Window struct {
mu sync.Mutex
events []interface{}
@ -81,16 +81,27 @@ func (w *Window) sweep(now time.Time) {
w.last = now
}()
b := w.bucket(now)
p := w.bucket(w.last)
// This compares now and w.last's monotonic clocks.
diff := now.Sub(w.last)
if diff < 0 {
// time went backwards somehow; zero events and return
for i := range w.events {
w.events[i] = nil
}
return
}
last := now.Add(-diff)
if b == p && now.Sub(w.last) <= w.res {
b := w.bucket(now)
p := w.bucket(last)
if b == p && diff <= w.res {
// We're in the same bucket as the previous sweep, so all buckets are
// valid.
return
}
if now.Sub(w.last) > w.res*time.Duration(len(w.events)) {
if diff > w.res*time.Duration(len(w.events)) {
// We've gone longer than this window measures since the last sweep, just
// zero the thing and have done.
for i := range w.events {
@ -102,10 +113,10 @@ func (w *Window) sweep(now time.Time) {
// Expire all invalid buckets. This means buckets not seen since the
// previous sweep and now, including the current bucket but not including the
// previous bucket.
old := int(w.last.UnixNano()) / int(w.res)
new := int(now.UnixNano()) / int(w.res)
old := int64(last.UnixNano()) / int64(w.res)
new := int64(now.UnixNano()) / int64(w.res)
for i := old + 1; i <= new; i++ {
b := i % len(w.events)
b := int(i) % len(w.events)
w.events[b] = nil
}
}

View file

@ -73,6 +73,21 @@ func TestWindows(t *testing.T) {
want: 6,
reduce: adder,
},
{ // what happens if time goes backwards?
size: time.Minute,
dur: time.Second,
incs: []epair{
{t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 2, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 3, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 4, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1},
},
look: time.Date(2000, 1, 1, 0, 0, 30, 0, time.UTC),
want: 1,
reduce: adder,
},
}
for _, e := range table {