Merge pull request #1767 from restic/update-blazer

Update github.com/kurin/blazer
This commit is contained in:
Alexander Neumann 2018-05-08 22:48:54 +02:00
commit e42b7db008
13 changed files with 902 additions and 48 deletions

6
Gopkg.lock generated
View file

@ -93,9 +93,9 @@
[[projects]]
name = "github.com/kurin/blazer"
packages = ["b2","base","internal/b2types","internal/blog"]
revision = "cd0304efa98725679cf68422cefa328d3d96f2f4"
version = "v0.3.0"
packages = ["b2","base","internal/b2assets","internal/b2types","internal/blog","x/window"]
revision = "b7c9cf27cae3aec98c2caaeb5181608bfe05b17c"
version = "v0.3.1"
[[projects]]
name = "github.com/marstr/guid"

View file

@ -45,7 +45,7 @@ type Client struct {
slock sync.Mutex
sWriters map[string]*Writer
sReaders map[string]*Reader
sMethods map[string]int
sMethods []methodCounter
}
// NewClient creates and returns a new Client with valid B2 service account
@ -55,7 +55,12 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
backend: &beRoot{
b2i: &b2Root{},
},
sMethods: make(map[string]int),
sMethods: []methodCounter{
newMethodCounter(time.Minute, time.Second),
newMethodCounter(time.Minute*5, time.Second),
newMethodCounter(time.Hour, time.Minute),
newMethodCounter(0, 0), // forever
},
}
opts = append(opts, client(c))
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
@ -73,6 +78,26 @@ type clientOptions struct {
userAgents []string
}
// for testing
func (c clientOptions) eq(o clientOptions) bool {
if c.client != o.client ||
c.transport != o.transport ||
c.failSomeUploads != o.failSomeUploads ||
c.expireTokens != o.expireTokens ||
c.capExceeded != o.capExceeded {
return false
}
if len(c.userAgents) != len(o.userAgents) {
return false
}
for i := range c.userAgents {
if c.userAgents[i] != o.userAgents[i] {
return false
}
}
return true
}
// A ClientOption allows callers to adjust various per-client settings.
type ClientOption func(*clientOptions)
@ -131,17 +156,30 @@ type clientTransport struct {
}
func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) {
method := r.Header.Get("X-Blazer-Method")
if method != "" && ct.client != nil {
ct.client.slock.Lock()
ct.client.sMethods[method]++
ct.client.slock.Unlock()
}
m := r.Header.Get("X-Blazer-Method")
t := ct.rt
if t == nil {
t = http.DefaultTransport
}
return t.RoundTrip(r)
b := time.Now()
resp, err := t.RoundTrip(r)
e := time.Now()
if err != nil {
return resp, err
}
if m != "" && ct.client != nil {
ct.client.slock.Lock()
m := method{
name: m,
duration: e.Sub(b),
status: resp.StatusCode,
}
for _, counter := range ct.client.sMethods {
counter.record(m)
}
ct.client.slock.Unlock()
}
return resp, nil
}
// Bucket is a reference to a B2 bucket.

View file

@ -37,6 +37,7 @@ type beRootInterface interface {
type beRoot struct {
account, key string
b2i b2RootInterface
options []ClientOption
}
type beBucketInterface interface {
@ -156,13 +157,14 @@ func (r *beRoot) authorizeAccount(ctx context.Context, account, key string, opts
}
r.account = account
r.key = key
r.options = opts
return nil
}
return withBackoff(ctx, r, f)
}
func (r *beRoot) reauthorizeAccount(ctx context.Context) error {
return r.authorizeAccount(ctx, r.account, r.key)
return r.authorizeAccount(ctx, r.account, r.key, r.options...)
}
func (r *beRoot) createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (beBucketInterface, error) {

View file

@ -25,7 +25,6 @@ import (
"net/http"
"os"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
@ -734,26 +733,7 @@ func TestWriteEmpty(t *testing.T) {
}
}
type rtCounter struct {
rt http.RoundTripper
trips int
sync.Mutex
}
func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) {
rt.Lock()
defer rt.Unlock()
rt.trips++
return rt.rt.RoundTrip(r)
}
func TestAttrsNoRoundtrip(t *testing.T) {
rt := &rtCounter{rt: defaultTransport}
defaultTransport = rt
defer func() {
defaultTransport = rt.rt
}()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
@ -774,7 +754,10 @@ func TestAttrsNoRoundtrip(t *testing.T) {
t.Fatalf("unexpected objects: got %d, want 1", len(objs))
}
trips := rt.trips
var trips int
for range bucket.c.Status().table()["1m"] {
trips += 1
}
attrs, err := objs[0].Attrs(ctx)
if err != nil {
t.Fatal(err)
@ -783,8 +766,12 @@ func TestAttrsNoRoundtrip(t *testing.T) {
t.Errorf("got the wrong object: got %q, want %q", attrs.Name, smallFileName)
}
if trips != rt.trips {
t.Errorf("Attrs() should not have caused any net traffic, but it did: old %d, new %d", trips, rt.trips)
var newTrips int
for range bucket.c.Status().table()["1m"] {
newTrips += 1
}
if trips != newTrips {
t.Errorf("Attrs() should not have caused any net traffic, but it did: old %d, new %d", trips, newTrips)
}
}
@ -837,8 +824,8 @@ func TestSmallUploadsFewRoundtrips(t *testing.T) {
}
}
si := bucket.c.Status()
getURL := si.MethodCalls["b2_get_upload_url"]
uploadFile := si.MethodCalls["b2_upload_file"]
getURL := si.RPCs[0].CountByMethod()["b2_get_upload_url"]
uploadFile := si.RPCs[0].CountByMethod()["b2_upload_file"]
if getURL >= uploadFile {
t.Errorf("too many calls to b2_get_upload_url")
}
@ -882,6 +869,37 @@ func TestListUnfinishedLargeFiles(t *testing.T) {
}
}
func TestReauthPreservesOptions(t *testing.T) {
ctx := context.Background()
bucket, done := startLiveTest(ctx, t)
defer done()
var first []ClientOption
opts := bucket.r.(*beRoot).options
for _, o := range opts {
first = append(first, o)
}
if err := bucket.r.reauthorizeAccount(ctx); err != nil {
t.Fatalf("reauthorizeAccount: %v", err)
}
second := bucket.r.(*beRoot).options
if len(second) != len(first) {
t.Fatalf("options mismatch: got %d options, wanted %d", len(second), len(first))
}
var f, s clientOptions
for i := range first {
first[i](&f)
second[i](&s)
}
if !f.eq(s) {
t.Errorf("options mismatch: got %v, want %v", s, f)
}
}
type object struct {
o *Object
err error

View file

@ -14,13 +14,83 @@
package b2
import "fmt"
import (
"fmt"
"html/template"
"math"
"net/http"
"sort"
"time"
"github.com/kurin/blazer/internal/b2assets"
"github.com/kurin/blazer/x/window"
)
// StatusInfo reports information about a client.
type StatusInfo struct {
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
MethodCalls map[string]int
// Writers contains the status of all current uploads with progress.
Writers map[string]*WriterStatus
// Readers contains the status of all current downloads with progress.
Readers map[string]*ReaderStatus
// RPCs contains information about recently made RPC calls over the last
// minute, five minutes, hour, and for all time.
RPCs map[time.Duration]MethodList
}
// MethodList is an accumulation of RPC calls that have been made over a given
// period of time.
type MethodList []method
// CountByMethod returns the total RPC calls made per method.
func (ml MethodList) CountByMethod() map[string]int {
r := make(map[string]int)
for i := range ml {
r[ml[i].name]++
}
return r
}
type method struct {
name string
duration time.Duration
status int
}
type methodCounter struct {
d time.Duration
w *window.Window
}
func (mc methodCounter) record(m method) {
mc.w.Insert([]method{m})
}
func (mc methodCounter) retrieve() MethodList {
ms := mc.w.Reduce()
return MethodList(ms.([]method))
}
func newMethodCounter(d, res time.Duration) methodCounter {
r := func(i, j interface{}) interface{} {
a, ok := i.([]method)
if !ok {
a = nil
}
b, ok := j.([]method)
if !ok {
b = nil
}
for _, m := range b {
a = append(a, m)
}
return a
}
return methodCounter{
d: d,
w: window.New(d, res, r),
}
}
// WriterStatus reports the status for each writer.
@ -43,9 +113,9 @@ func (c *Client) Status() *StatusInfo {
defer c.slock.Unlock()
si := &StatusInfo{
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
MethodCalls: make(map[string]int),
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
RPCs: make(map[time.Duration]MethodList),
}
for name, w := range c.sWriters {
@ -56,13 +126,30 @@ func (c *Client) Status() *StatusInfo {
si.Readers[name] = r.status()
}
for name, n := range c.sMethods {
si.MethodCalls[name] = n
for _, c := range c.sMethods {
si.RPCs[c.d] = c.retrieve()
}
return si
}
func (si *StatusInfo) table() map[string]map[string]int {
r := make(map[string]map[string]int)
for d, c := range si.RPCs {
for _, m := range c {
if _, ok := r[m.name]; !ok {
r[m.name] = make(map[string]int)
}
dur := "all time"
if d > 0 {
dur = d.String()
}
r[m.name][dur]++
}
}
return r
}
func (c *Client) addWriter(w *Writer) {
c.slock.Lock()
defer c.slock.Unlock()
@ -106,3 +193,59 @@ func (c *Client) removeReader(r *Reader) {
delete(c.sReaders, fmt.Sprintf("%s/%s", r.o.b.Name(), r.name))
}
var (
funcMap = template.FuncMap{
"inc": func(i int) int { return i + 1 },
"lookUp": func(m map[string]int, s string) int { return m[s] },
"pRange": func(i int) string {
f := float64(i)
min := int(math.Pow(2, f)) - 1
max := min + int(math.Pow(2, f))
return fmt.Sprintf("%v - %v", time.Duration(min)*time.Millisecond, time.Duration(max)*time.Millisecond)
},
"methods": func(si *StatusInfo) []string {
methods := make(map[string]bool)
for _, ms := range si.RPCs {
for _, m := range ms {
methods[m.name] = true
}
}
var names []string
for name := range methods {
names = append(names, name)
}
sort.Strings(names)
return names
},
"durations": func(si *StatusInfo) []string {
var ds []time.Duration
for d := range si.RPCs {
ds = append(ds, d)
}
sort.Slice(ds, func(i, j int) bool { return ds[i] < ds[j] })
var r []string
for _, d := range ds {
dur := "all time"
if d > 0 {
dur = d.String()
}
r = append(r, dur)
}
return r
},
"table": func(si *StatusInfo) map[string]map[string]int { return si.table() },
}
statusTemplate = template.Must(template.New("status").Funcs(funcMap).Parse(string(b2assets.MustAsset("data/status.html"))))
)
// ServeHTTP serves diagnostic information about the current state of the
// client; essentially everything available from Client.Status()
//
// ServeHTTP satisfies the http.Handler interface. This means that a Client
// can be passed directly to a path via http.Handle (or on a custom ServeMux or
// a custom http.Server).
func (c *Client) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
info := c.Status()
statusTemplate.Execute(rw, info)
}

View file

@ -42,7 +42,7 @@ import (
const (
APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.3.0"
DefaultUserAgent = "blazer/0.3.1"
)
type b2err struct {

View file

@ -0,0 +1,237 @@
// Code generated by go-bindata.
// sources:
// data/status.html
// DO NOT EDIT!
package b2assets
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err)
}
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil
}
type asset struct {
bytes []byte
info os.FileInfo
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var _dataStatusHtml = []byte("\x1f\x8b\x08\x00\x00\x09\x6e\x88\x00\xff\xd4\x93\x41\x6f\xe3\x20\x10\x85\xef\xf9\x15\xb3\x56\x8e\x51\x90\x73\x5c\x4d\xb8\xec\xee\x79\xa3\xaa\x52\xd5\x23\x36\xa3\x60\x09\x43\x84\x71\x9a\xc8\xe2\xbf\x57\x18\x83\xa3\xb6\x87\x5e\x7b\xf2\x98\xf7\xe6\xf1\xbe\x03\xf8\xeb\xef\xff\x3f\xcf\xaf\xa7\x7f\xa0\x7c\xaf\xf9\x06\xf3\x87\x84\xe4\x1b\x00\xf4\x9d\xd7\xc4\x9b\x03\xb4\xba\x23\xe3\x61\xf0\xc2\x8f\x03\xb2\x74\xbe\x41\x96\x9c\xd8\x58\x79\x8f\x0b\xd3\xb4\xed\xc9\x2b\x2b\x07\xf8\x7d\x84\x3c\xee\x43\x48\x9a\x1c\x9d\xf0\x9d\x35\xb3\xba\xfe\x14\xdd\x8b\x46\x53\xd4\xd2\x90\xce\x51\xd5\xbc\xb5\xa3\xf1\xd0\xdc\xa1\xb5\x92\x90\xa9\x3a\xb5\x8b\xae\x38\xc5\x65\x27\xcc\x99\x60\xb9\x3e\x66\xe4\x26\x73\x48\x74\xbb\x64\x8d\xa3\xe4\xa5\x69\x08\xc8\xbc\xcc\x52\xc9\xc9\xed\xe6\xa4\x52\x75\xc9\x5a\x43\x3a\x23\xe9\x06\x4b\xf1\x7c\x79\xf1\x7f\xcc\x26\x23\x73\x1b\x96\xeb\xac\xa7\xc8\x0a\x50\x64\x1e\x2f\xda\x0a\x39\x64\xda\x87\x6e\x46\xf4\xb4\x83\xed\x55\xe8\xd8\x6e\xff\xe2\x3a\x4f\xae\x70\xaa\x03\x9f\xa6\x64\x82\x58\x40\x1d\x3e\xc1\x75\x72\x07\xdb\x8b\xb3\xe7\x99\xee\x2a\xf4\xfe\xe4\xec\xd9\xd1\xb0\x02\x46\xb4\x36\x3a\x43\x00\xbc\x2c\x2a\x5c\x85\x1e\xe9\x58\x4d\xd3\xbc\x1d\x42\x05\xbd\xb8\x1d\xab\xba\xe2\xc8\xb2\x89\x63\xe3\x80\x7d\x05\xfd\x80\xaa\x6a\x2e\xed\x9b\xf9\x26\xe1\x13\x09\xf9\xa3\x08\x91\xa5\x17\x81\x2c\xbd\xa8\xf7\x00\x00\x00\xff\xff\xd4\xf0\x90\xb4\x69\x03\x00\x00")
func dataStatusHtmlBytes() ([]byte, error) {
return bindataRead(
_dataStatusHtml,
"data/status.html",
)
}
func dataStatusHtml() (*asset, error) {
bytes, err := dataStatusHtmlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "data/status.html", size: 873, mode: os.FileMode(436), modTime: time.Unix(1520578750, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"data/status.html": dataStatusHtml,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"}
// AssetDir("data/img") would return []string{"a.png", "b.png"}
// AssetDir("foo.txt") and AssetDir("notexist") would return an error
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
cannonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(cannonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"data": &bintree{nil, map[string]*bintree{
"status.html": &bintree{dataStatusHtml, map[string]*bintree{}},
}},
}}
// RestoreAsset restores an asset under the given directory
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
if err != nil {
return err
}
return nil
}
// RestoreAssets restores an asset under the given directory recursively
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
cannonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...)
}

View file

@ -0,0 +1,36 @@
<!DOCTYPE html>
<html>
<head>
<title>b2 client status</title>
</head>
<body>
{{$methods := methods .}}
{{$durations := durations .}}
{{$table := table .}}
<h1>count by code</h1>
<table>
{{range $method := $methods}}
<tr>
<td>{{$method}}</td>
{{range $duration := $durations}}
<td>{{index $table $method $duration}}</td>
{{end}}
</tr>
{{end}}
</table>
<h1>uploads</h1>
{{range $name, $val := .Writers}}
<h2>{{ $name }}</h2>
{{range $id, $prog := $val.Progress}}
{{inc $id}} <progress value="{{$prog}}" max="1"></progress><br />
{{end}}
{{end}}
<h1>downloads</h1>
{{range $name, $val := .Readers}}
<h2>{{ $name }}</h2>
{{range $id, $prog := $val.Progress}}
{{inc $id}} <progress value="{{$prog}}" max="1"></progress><br />
{{end}}
{{end}}
</body>
</html>

View file

@ -0,0 +1,18 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package b2assets contains data required by other libraries in blazer.
package b2assets
//go:generate go-bindata -pkg $GOPACKAGE -o b2assets.go data/

63
vendor/github.com/kurin/blazer/x/window/accum_test.go generated vendored Normal file
View file

@ -0,0 +1,63 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package window_test
import (
"fmt"
"time"
"github.com/kurin/blazer/x/window"
)
type Accumulator struct {
w *window.Window
}
func (a Accumulator) Add(s string) {
a.w.Insert([]string{s})
}
func (a Accumulator) All() []string {
v := a.w.Reduce()
return v.([]string)
}
func NewAccum(size time.Duration) Accumulator {
r := func(i, j interface{}) interface{} {
a, ok := i.([]string)
if !ok {
a = nil
}
b, ok := j.([]string)
if !ok {
b = nil
}
for _, s := range b {
a = append(a, s)
}
return a
}
return Accumulator{w: window.New(size, time.Second, r)}
}
func Example_accumulator() {
a := NewAccum(time.Minute)
a.Add("this")
a.Add("is")
a.Add("that")
fmt.Printf("total: %v\n", a.All())
// Output:
// total: [this is that]
}

View file

@ -0,0 +1,60 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package window_test
import (
"fmt"
"time"
"github.com/kurin/blazer/x/window"
)
type Counter struct {
w *window.Window
}
func (c Counter) Add() {
c.w.Insert(1)
}
func (c Counter) Count() int {
v := c.w.Reduce()
return v.(int)
}
func New(size time.Duration) Counter {
r := func(i, j interface{}) interface{} {
a, ok := i.(int)
if !ok {
a = 0
}
b, ok := j.(int)
if !ok {
b = 0
}
return a + b
}
return Counter{w: window.New(size, time.Second, r)}
}
func Example_counter() {
c := New(time.Minute)
c.Add()
c.Add()
c.Add()
fmt.Printf("total: %d\n", c.Count())
// Output:
// total: 3
}

151
vendor/github.com/kurin/blazer/x/window/window.go generated vendored Normal file
View file

@ -0,0 +1,151 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package window provides a type for efficiently recording events as they
// occur over a given span of time. Events added to the window will remain
// until the time expires.
package window
import (
"sync"
"time"
)
// A Window efficiently records events that have occurred over a span of time
// extending from some fixed interval ago to now. Events that pass beyond this
// horizon effectively "fall off" the back of the window.
type Window struct {
mu sync.Mutex
events []interface{}
res time.Duration
last time.Time
reduce Reducer
forever bool
e interface{}
}
// A Reducer should take two values from the window and combine them into a
// third value that will be stored in the window. The values i or j may be
// nil. The underlying types for both arguments and the output should be
// identical.
//
// If the reducer is any kind of slice or list, then data usage will grow
// linearly with the number of events added to the window.
//
// Reducer will be called on its own output: Reducer(Reducer(x, y), z).
type Reducer func(i, j interface{}) interface{}
// New returns an initialized window for events over the given duration at the
// given resolution. Windows with tight resolution (i.e., small values for
// that argument) will be more accurate, at the cost of some memory.
//
// A size of 0 means "forever"; old events will never be removed.
func New(size, resolution time.Duration, r Reducer) *Window {
if size > 0 {
return &Window{
res: resolution,
events: make([]interface{}, size/resolution),
reduce: r,
}
}
return &Window{
forever: true,
reduce: r,
}
}
func (w *Window) bucket(now time.Time) int {
nanos := now.UnixNano()
abs := nanos / int64(w.res)
return int(abs) % len(w.events)
}
// sweep keeps the window valid. It needs to be called from every method that
// views or updates the window, and the caller needs to hold the mutex.
func (w *Window) sweep(now time.Time) {
if w.forever {
return
}
defer func() {
w.last = now
}()
b := w.bucket(now)
p := w.bucket(w.last)
if b == p && now.Sub(w.last) <= w.res {
// We're in the same bucket as the previous sweep, so all buckets are
// valid.
return
}
if now.Sub(w.last) > w.res*time.Duration(len(w.events)) {
// We've gone longer than this window measures since the last sweep, just
// zero the thing and have done.
for i := range w.events {
w.events[i] = nil
}
return
}
// Expire all invalid buckets. This means buckets not seen since the
// previous sweep and now, including the current bucket but not including the
// previous bucket.
old := int(w.last.UnixNano()) / int(w.res)
new := int(now.UnixNano()) / int(w.res)
for i := old + 1; i <= new; i++ {
b := i % len(w.events)
w.events[b] = nil
}
}
// Insert adds the given event.
func (w *Window) Insert(e interface{}) {
w.insertAt(time.Now(), e)
}
func (w *Window) insertAt(t time.Time, e interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
if w.forever {
w.e = w.reduce(w.e, e)
return
}
w.sweep(t)
w.events[w.bucket(t)] = w.reduce(w.events[w.bucket(t)], e)
}
// Reduce runs the window's reducer over the valid values and returns the
// result.
func (w *Window) Reduce() interface{} {
return w.reducedAt(time.Now())
}
func (w *Window) reducedAt(t time.Time) interface{} {
w.mu.Lock()
defer w.mu.Unlock()
if w.forever {
return w.e
}
w.sweep(t)
var n interface{}
for i := range w.events {
n = w.reduce(n, w.events[i])
}
return n
}

88
vendor/github.com/kurin/blazer/x/window/window_test.go generated vendored Normal file
View file

@ -0,0 +1,88 @@
// Copyright 2018, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package window
import (
"testing"
"time"
)
type epair struct {
e interface{}
t time.Time
}
func adder(i, j interface{}) interface{} {
a, ok := i.(int)
if !ok {
a = 0
}
b, ok := j.(int)
if !ok {
b = 0
}
return a + b
}
func TestWindows(t *testing.T) {
table := []struct {
size, dur time.Duration
incs []epair
look time.Time
reduce Reducer
want interface{}
}{
{
size: time.Minute,
dur: time.Second,
incs: []epair{
// year, month, day, hour, min, sec, nano
{t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 2, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 3, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 4, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 5, 0, time.UTC), e: 1},
},
look: time.Date(2000, 1, 1, 0, 1, 0, 0, time.UTC),
want: 5,
reduce: adder,
},
{
incs: []epair{
// year, month, day, hour, min, sec, nano
{t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 2, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 3, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 4, 0, time.UTC), e: 1},
{t: time.Date(2000, 1, 1, 0, 0, 5, 0, time.UTC), e: 1},
},
want: 6,
reduce: adder,
},
}
for _, e := range table {
w := New(e.size, e.dur, e.reduce)
for _, inc := range e.incs {
w.insertAt(inc.t, inc.e)
}
ct := w.reducedAt(e.look)
if ct != e.want {
t.Errorf("reducedAt(%v) got %v, want %v", e.look, ct, e.want)
}
}
}