vendor: remove github.com/tsenart/tb

This commit is contained in:
Nick Craig-Wood 2017-07-14 05:34:18 +01:00
parent 62e28d0a72
commit ec6c3f2686
12 changed files with 1 additions and 800 deletions

8
Gopkg.lock generated
View file

@ -187,12 +187,6 @@
packages = ["assert","require"]
revision = "4d4bfba8f1d1027c4fdbe371823030df51419987"
[[projects]]
branch = "master"
name = "github.com/tsenart/tb"
packages = ["."]
revision = "19f4c3d79d2bd67d0911b2e310b999eeea4454c1"
[[projects]]
branch = "master"
name = "github.com/xanzy/ssh-agent"
@ -268,6 +262,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "12ff30322557a90caad41c9a96a05947b6a235dbc0a8156c489eba8db776d081"
inputs-digest = "d1197786e4b7133a2e775df76d12dc57d690f2136871f8b0ad3f793c48b4ab08"
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -1,20 +0,0 @@
language: go
sudo: false
go:
- 1.2
- 1.3
- 1.4
- 1.5
- tip
install:
- go get -v golang.org/x/tools/cmd/vet
- go get -v github.com/golang/lint/golint
- go get -d -t -v ./...
- go build -v ./...
script:
- go vet ./...
- golint .
- go test -v -parallel=8 ./...

View file

@ -1,49 +0,0 @@
# Token Bucket (tb) [![Build Status](https://secure.travis-ci.org/tsenart/tb.png)](http://travis-ci.org/tsenart/tb) [![GoDoc](https://godoc.org/github.com/tsenart/tb?status.png)](https://godoc.org/github.com/tsenart/tb)
This package provides a generic lock-free implementation of the "Token bucket"
algorithm where handling of non-conformity is left to the user.
> The token bucket is an algorithm used in packet switched computer networks and telecommunications networks. It can be used to check that data transmissions, in the form of packets, conform to defined limits on bandwidth and burstiness (a measure of the unevenness or variations in the traffic flow)
-- <cite>[Wikipedia](http://en.wikipedia.org/wiki/Token_bucket)</cite>
This implementation of the token bucket generalises its applications beyond packet rate conformance. Hence, the word *generic*. You can use it to throttle any flow over time as long as it can be expressed as a number (bytes/s, requests/s, messages/s, packets/s, potatoes/s, heartbeats/s, etc...).
The *lock-free* part of the description refers to the lock-free programming techniques (CAS loop) used in the core `Bucket` methods (`Take` and `Put`). [Here is](http://preshing.com/20120612/an-introduction-to-lock-free-programming/) a good overview of lock-free programming you can refer to.
All utility pacakges such as [http](http/) and [io](io/) are just wrappers around the core package.
This ought to be your one stop shop for all things **throttling** in Go so feel free to propose missing common functionality.
## Install
```shell
$ go get github.com/tsenart/tb
```
## Usage
Read up the [docs](https://godoc.org/github.com/tsenart/tb) and have a look at some [examples](examples/).
## Licence
```
The MIT License (MIT)
Copyright (c) 2014-2015 Tomás Senart
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
```

View file

@ -1,130 +0,0 @@
package tb
import (
"math"
"sync/atomic"
"time"
)
// Bucket defines a generic lock-free implementation of a Token Bucket.
type Bucket struct {
inc int64
tokens int64
capacity int64
freq time.Duration
closing chan struct{}
}
// NewBucket returns a full Bucket with c capacity and starts a filling
// go-routine which ticks every freq. The number of tokens added on each tick
// is computed dynamically to be even across the duration of a second.
//
// If freq == -1 then the filling go-routine won't be started. Otherwise,
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
func NewBucket(c int64, freq time.Duration) *Bucket {
b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}
if freq == -1 {
return b
} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
freq = evenFreq
}
b.freq = freq
b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))
go b.fill()
return b
}
// Take attempts to take n tokens out of the bucket.
// If tokens == 0, nothing will be taken.
// If n <= tokens, n tokens will be taken.
// If n > tokens, all tokens will be taken.
//
// This method is thread-safe.
func (b *Bucket) Take(n int64) (taken int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == 0 {
return 0
} else if n <= tokens {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens-n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, 0) { // Spill
return tokens
}
}
}
// Put attempts to add n tokens to the bucket.
// If tokens == capacity, nothing will be added.
// If n <= capacity - tokens, n tokens will be added.
// If n > capacity - tokens, capacity - tokens will be added.
//
// This method is thread-safe.
func (b *Bucket) Put(n int64) (added int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == b.capacity {
return 0
} else if left := b.capacity - tokens; n <= left {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens+n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, b.capacity) {
return left
}
}
}
// Wait waits for n amount of tokens to be available.
// If n tokens are immediatelly available it doesn't sleep.
// Otherwise, it sleeps the minimum amount of time required for the remaining
// tokens to be available. It returns the wait duration.
//
// This method is thread-safe.
func (b *Bucket) Wait(n int64) time.Duration {
var rem int64
if rem = n - b.Take(n); rem == 0 {
return 0
}
var wait time.Duration
for rem > 0 {
sleep := b.wait(rem)
wait += sleep
time.Sleep(sleep)
rem -= b.Take(rem)
}
return wait
}
// Close stops the filling go-routine given it was started.
func (b *Bucket) Close() error {
close(b.closing)
return nil
}
// wait returns the minimum amount of time required for n tokens to be available.
// if n > capacity, n will be adjusted to capacity
func (b *Bucket) wait(n int64) time.Duration {
return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
b.freq.Nanoseconds())
}
func (b *Bucket) fill() {
ticker := time.NewTicker(b.freq)
defer ticker.Stop()
for _ = range ticker.C {
select {
case <-b.closing:
return
default:
b.Put(b.inc)
}
}
}

View file

@ -1,191 +0,0 @@
package tb
import (
"fmt"
"runtime"
"testing"
"time"
)
func TestNewBucket(t *testing.T) {
t.Parallel()
b := NewBucket(10, -1)
b.Take(10)
time.Sleep(100 * time.Millisecond)
if w, g := int64(0), b.Take(1); w != g {
t.Fatal("Expected no filling when freq == -1")
}
}
func TestBucket_Take_single(t *testing.T) {
t.Parallel()
b := NewBucket(10, 0)
defer b.Close()
ex := [...]int64{5, 5, 1, 1, 5, 4, 1, 0}
for i := 0; i < len(ex)-1; i += 2 {
if got, want := b.Take(ex[i]), ex[i+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
}
func TestBucket_Put_single(t *testing.T) {
t.Parallel()
b := NewBucket(10, 0)
defer b.Close()
b.Take(10)
ex := [...]int64{5, 5, 10, 5, 15, 0}
for i := 0; i < len(ex)-1; i += 2 {
if got, want := b.Put(ex[i]), ex[i+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
}
func TestBucket_Take_multi(t *testing.T) {
t.Parallel()
b := NewBucket(10, 0)
defer b.Close()
exs := [2][]int64{{4, 4, 2, 2}, {2, 2, 1, 1}}
for i := 0; i < 2; i++ {
go func(i int) {
for j := 0; j < len(exs[i])-1; j += 2 {
if got, want := b.Take(exs[i][j]), exs[i][j+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
}(i)
}
}
func TestBucket_Put_multi(t *testing.T) {
t.Parallel()
b := NewBucket(10, 0)
defer b.Close()
b.Take(10)
exs := [2][]int64{{4, 4, 2, 2}, {2, 2, 1, 1}}
for i := 0; i < 2; i++ {
go func(i int) {
for j := 0; j < len(exs[i])-1; j += 2 {
if got, want := b.Put(exs[i][j]), exs[i][j+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
}(i)
}
}
func TestBucket_Take_throughput(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("Skipping test in short mode")
}
runtime.GOMAXPROCS(2)
b := NewBucket(1000, 0)
defer b.Close()
b.Take(1000)
var (
out int64
began = time.Now()
)
for out < 1000 {
out += b.Take(1000 - out)
}
ended := time.Since(began)
if int(ended.Seconds()) != 1 {
t.Errorf("Want 1000 tokens to take 1s. Got: %d", int(ended.Seconds()))
}
}
func BenchmarkBucket_Take_sequential(b *testing.B) {
bucket := NewBucket(int64(b.N), 0)
defer bucket.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bucket.Take(8)
}
}
func BenchmarkBucket_Put_sequential(b *testing.B) {
bucket := NewBucket(int64(b.N), 0)
defer bucket.Close()
bucket.Take(int64(b.N))
b.ResetTimer()
for i := 0; i < b.N; i++ {
bucket.Put(8)
}
}
func TestBucket_Wait(t *testing.T) {
t.Parallel()
cases := map[*Bucket]time.Duration{
NewBucket(250, 100*time.Millisecond): 7 * time.Second,
NewBucket(500, 100*time.Millisecond): 3 * time.Second,
NewBucket(1e3, 500*time.Millisecond): 1 * time.Second,
NewBucket(1e3, 20*time.Millisecond): 1 * time.Second,
NewBucket(1e3, 1*time.Millisecond): 1 * time.Second,
NewBucket(1e3, 0): 1 * time.Second,
NewBucket(2e3, 0): 0,
NewBucket(3e3, 0): 0,
}
errors := make(chan error, len(cases))
for bucket, wait := range cases {
go func(bucket *Bucket, wait time.Duration) {
defer bucket.Close()
start := time.Now()
got := bucket.Wait(2000)
took := time.Since(start)
if int(wait.Seconds()) != int(got.Seconds()) {
errors <- fmt.Errorf("bucket.Wait(2000) with cap=%d, freq=%s: Want: %s, Got %s",
bucket.capacity, bucket.freq, wait, got)
} else if took < wait-time.Second || took > wait+time.Second {
// took is the actual time the bucket.Wait() took
// wait is the time we expected it to take
// if took is more than 1 second different from wait, then return an error
errors <- fmt.Errorf("bucket.Wait(2000) with cap=%d, freq=%s: Waited for %v which isn't within 1 second of %v",
bucket.capacity, bucket.freq, wait, took)
} else {
errors <- nil
}
}(bucket, wait)
}
for i := 0; i < cap(errors); i++ {
if err := <-errors; err != nil {
t.Error(err)
}
}
}
func TestBucket_Close(t *testing.T) {
b := NewBucket(10000, 0)
b.Close()
b.Take(10000)
time.Sleep(10 * time.Millisecond)
if want, got := int64(0), b.Take(1); want != got {
t.Errorf("Want: %d Got: %d", want, got)
}
}

View file

@ -1,42 +0,0 @@
package examples
import (
"github.com/tsenart/tb"
"io"
"log"
"net"
"time"
)
func main() {
ln, err := net.Listen("tcp", ":6789")
if err != nil {
log.Fatal(err)
}
th := tb.NewThrottler(100 * time.Millisecond)
echo := func(conn net.Conn) {
defer conn.Close()
host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
panic(err)
}
// Throttle to 10 connection per second from the same host
// Handle non-conformity by dropping the connection
if th.Halt(host, 1, 10) {
log.Printf("Throttled %s", host)
return
}
log.Printf("Echoing payload from %s:%s", host, port)
io.Copy(conn, conn)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
go echo(conn)
}
}

View file

@ -1,45 +0,0 @@
package http
import (
"github.com/tsenart/tb"
"net/http"
"time"
)
type roundTripperFunc func(r *http.Request) (*http.Response, error)
func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
// ByteThrottledRoundTripper wraps another RoundTripper rt,
// throttling all requests to the specified byte rate.
func ByteThrottledRoundTripper(rt http.RoundTripper, rate int64) http.RoundTripper {
freq := time.Duration(1 * time.Millisecond)
bucket := tb.NewBucket(rate, freq)
return roundTripperFunc(func(r *http.Request) (*http.Response, error) {
got := bucket.Take(r.ContentLength)
for got < r.ContentLength {
got += bucket.Take(r.ContentLength - got)
time.Sleep(freq)
}
return rt.RoundTrip(r)
})
}
// ReqThrottledRoundTripper wraps another RoundTripper rt,
// throttling all requests to the specified request rate.
func ReqThrottledRoundTripper(rt http.RoundTripper, rate int64) http.RoundTripper {
freq := time.Duration(1e9 / rate)
bucket := tb.NewBucket(rate, freq)
return roundTripperFunc(func(r *http.Request) (*http.Response, error) {
got := bucket.Take(1)
for got != 1 {
got = bucket.Take(1)
time.Sleep(freq)
}
return rt.RoundTrip(r)
})
}

View file

@ -1,38 +0,0 @@
package http
import (
"github.com/tsenart/tb"
"net"
"net/http"
"time"
)
var byteThrottler = tb.NewThrottler(25 * time.Millisecond)
// ByteThrottledHandler wraps an http.Handler with per host byte throttling to
// the specified byte rate, responding with 429 when throttled.
func ByteThrottledHandler(h http.Handler, rate int64) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, _, _ := net.SplitHostPort(r.RemoteAddr)
if byteThrottler.Halt(host, r.ContentLength, rate) {
http.Error(w, "Too many requests", 429)
return
}
h.ServeHTTP(w, r)
})
}
var reqThrottler = tb.NewThrottler(5 * time.Millisecond)
// ReqThrottledHandler wraps an http.Handler with per host request throttling
// to the specified request rate, responding with 429 when throttled.
func ReqThrottledHandler(h http.Handler, rate int64) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, _, _ := net.SplitHostPort(r.RemoteAddr)
if reqThrottler.Halt(host, 1, rate) {
http.Error(w, "Too many requests", 429)
return
}
h.ServeHTTP(w, r)
})
}

View file

@ -1,35 +0,0 @@
package io
import (
"github.com/tsenart/tb"
"io"
"time"
)
// NewThrottledWriter is an io.Writer wrapping another io.Writer with
// byte rate throttling, flushing block bytes at a time.
func NewThrottledWriter(rate, block int64, w io.Writer) io.Writer {
return &throttledWriter{rate, block, w, tb.NewBucket(rate, -1)}
}
type throttledWriter struct {
rate, block int64
w io.Writer
b *tb.Bucket
}
func (tw *throttledWriter) Write(p []byte) (n int, err error) {
for wr := 0; wr < len(p); {
var got int64
for got < tw.block {
if got += tw.b.Take(tw.block - got); got != tw.block {
time.Sleep(time.Duration((1e9 / tw.rate) * (tw.block - got)))
}
}
if n, err = tw.w.Write(p[wr : wr+int(got)]); err != nil {
return wr, err
}
wr += n
}
return len(p), nil
}

4
vendor/github.com/tsenart/tb/tb.go generated vendored
View file

@ -1,4 +0,0 @@
// Package tb provides a generic lock-free implementation of the
// Token Bucket algorithm where non-conformity is handled by the user.
// http://en.wikipedia.org/wiki/Token_bucket
package tb

View file

@ -1,124 +0,0 @@
package tb
import (
"math"
"sync"
"time"
)
// Throttler is a thread-safe wrapper around a map of buckets and an easy to
// use API for generic throttling.
type Throttler struct {
mu sync.RWMutex
freq time.Duration
buckets map[string]*Bucket
closing chan struct{}
}
// NewThrottler returns a Throttler with a single filler go-routine for all
// its Buckets which ticks every freq.
// The number of tokens added on each tick for each bucket is computed
// dynamically to be even accross the duration of a second.
//
// If freq <= 0, the filling go-routine won't be started.
func NewThrottler(freq time.Duration) *Throttler {
th := &Throttler{
freq: freq,
buckets: map[string]*Bucket{},
closing: make(chan struct{}),
}
if freq > 0 {
go th.fill(freq)
}
return th
}
// Bucket returns a Bucket with rate capacity, keyed by key.
//
// If a Bucket (key, rate) doesn't exist yet, it is created.
//
// You must call Close when you're done with the Throttler in order to not leak
// a go-routine and a system-timer.
func (t *Throttler) Bucket(key string, rate int64) *Bucket {
t.mu.Lock()
defer t.mu.Unlock()
b, ok := t.buckets[key]
if !ok {
b = NewBucket(rate, -1)
b.inc = int64(math.Floor(.5 + (float64(b.capacity) * t.freq.Seconds())))
b.freq = t.freq
t.buckets[key] = b
}
return b
}
// Wait waits for n amount of tokens to be available.
// If n tokens are immediatelly available it doesn't sleep. Otherwise, it sleeps
// the minimum amount of time required for the remaining tokens to be available.
// It returns the wait duration.
//
// If a Bucket (key, rate) doesn't exist yet, it is created.
// If freq < 1/rate seconds, the effective wait rate won't be correct.
//
// You must call Close when you're done with the Throttler in order to not leak
// a go-routine and a system-timer.
func (t *Throttler) Wait(key string, n, rate int64) time.Duration {
return t.Bucket(key, rate).Wait(n)
}
// Halt returns a bool indicating if the Bucket identified by key and rate has
// n amount of tokens. If it doesn't, the taken tokens are added back to the
// bucket.
//
// If a Bucket (key, rate) doesn't exist yet, it is created.
// If freq < 1/rate seconds, the results won't be correct.
//
// You must call Close when you're done with the Throttler in order to not leak
// a go-routine and a system-timer.
func (t *Throttler) Halt(key string, n, rate int64) bool {
b := t.Bucket(key, rate)
if got := b.Take(n); got != n {
b.Put(got)
return true
}
return false
}
// Close stops filling the Buckets, closing the filling go-routine.
func (t *Throttler) Close() error {
close(t.closing)
t.mu.RLock()
defer t.mu.RUnlock()
for _, b := range t.buckets {
b.Close()
}
return nil
}
func (t *Throttler) fill(freq time.Duration) {
ticker := time.NewTicker(freq)
defer ticker.Stop()
for _ = range ticker.C {
select {
case <-t.closing:
return
default:
}
t.mu.RLock()
for _, b := range t.buckets {
b.Put(b.inc)
}
t.mu.RUnlock()
}
}

View file

@ -1,115 +0,0 @@
package tb
import (
"io"
"log"
"net"
"strconv"
"testing"
"time"
)
func TestThrottler_Bucket(t *testing.T) {
t.Parallel()
th := NewThrottler(0)
defer th.Close()
b := th.Bucket("a", 1000)
ex := [...]int64{100, 100, 1000, 900, 1, 0}
for i := 0; i < len(ex)-1; i += 2 {
if got, want := b.Take(ex[i]), ex[i+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
for i := 0; i < len(ex)-1; i += 2 {
if got, want := b.Put(ex[i]), ex[i+1]; got != want {
t.Errorf("Want: %d, Got: %d", want, got)
}
}
}
func TestThrottler_Halt(t *testing.T) {
t.Parallel()
th := NewThrottler(0)
defer th.Close()
if th.Halt("a", 1000, 1000) {
t.Fatal("Didn't expect halt")
}
if !th.Halt("a", 1, 1000) {
t.Fatal("Expected halt")
}
if th.Halt("b", 1000, 1000) {
t.Fatal("Didn't expect halt")
}
}
func TestThrottler_Wait(t *testing.T) {
t.Parallel()
th := NewThrottler(1 * time.Millisecond)
defer th.Close()
if wait := th.Wait("a", 1000, 1000); wait > 0 {
t.Fatal("Didn't expect wait")
}
if wait := th.Wait("a", 2000, 1000); int(wait.Seconds()) != 2 {
t.Fatalf("Expected wait of 2s. Got: %s", wait)
}
}
func BenchmarkThrottler_Bucket(b *testing.B) {
keys := make([]string, 10000)
for i := 0; i < len(keys); i++ {
keys[i] = strconv.Itoa(i)
}
th := NewThrottler(1 * time.Millisecond)
defer th.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
th.Bucket(keys[i%(len(keys)-1)], 1000)
}
}
func ExampleThrottler() {
ln, err := net.Listen("tcp", ":6789")
if err != nil {
log.Fatal(err)
}
th := NewThrottler(100 * time.Millisecond)
defer th.Close()
echo := func(conn net.Conn) {
defer conn.Close()
host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
panic(err)
}
// Throttle to 10 connection per second from the same host
// Handle non-conformity by dropping the connection
if th.Halt(host, 1, 10) {
log.Printf("Throttled %s", host)
return
}
log.Printf("Echoing payload from %s:%s", host, port)
io.Copy(conn, conn)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
go echo(conn)
}
}