b9f0f031b6
Closes #2129
162 lines
4.2 KiB
Go
162 lines
4.2 KiB
Go
// Copyright 2018, the Blazer authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package window provides a type for efficiently recording events as they
|
|
// occur over a given span of time. Events added to the window will remain
|
|
// until the time expires.
|
|
package window
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// A Window efficiently records events that have occurred over a span of time
|
|
// extending from some fixed interval ago to now. Events that pass beyond this
|
|
// horizon are discarded.
|
|
type Window struct {
|
|
mu sync.Mutex
|
|
events []interface{}
|
|
res time.Duration
|
|
last time.Time
|
|
reduce Reducer
|
|
forever bool
|
|
e interface{}
|
|
}
|
|
|
|
// A Reducer should take two values from the window and combine them into a
|
|
// third value that will be stored in the window. The values i or j may be
|
|
// nil. The underlying types for both arguments and the output should be
|
|
// identical.
|
|
//
|
|
// If the reducer is any kind of slice or list, then data usage will grow
|
|
// linearly with the number of events added to the window.
|
|
//
|
|
// Reducer will be called on its own output: Reducer(Reducer(x, y), z).
|
|
type Reducer func(i, j interface{}) interface{}
|
|
|
|
// New returns an initialized window for events over the given duration at the
|
|
// given resolution. Windows with tight resolution (i.e., small values for
|
|
// that argument) will be more accurate, at the cost of some memory.
|
|
//
|
|
// A size of 0 means "forever"; old events will never be removed.
|
|
func New(size, resolution time.Duration, r Reducer) *Window {
|
|
if size > 0 {
|
|
return &Window{
|
|
res: resolution,
|
|
events: make([]interface{}, size/resolution),
|
|
reduce: r,
|
|
}
|
|
}
|
|
return &Window{
|
|
forever: true,
|
|
reduce: r,
|
|
}
|
|
}
|
|
|
|
func (w *Window) bucket(now time.Time) int {
|
|
nanos := now.UnixNano()
|
|
abs := nanos / int64(w.res)
|
|
return int(abs) % len(w.events)
|
|
}
|
|
|
|
// sweep keeps the window valid. It needs to be called from every method that
|
|
// views or updates the window, and the caller needs to hold the mutex.
|
|
func (w *Window) sweep(now time.Time) {
|
|
if w.forever {
|
|
return
|
|
}
|
|
defer func() {
|
|
w.last = now
|
|
}()
|
|
|
|
// This compares now and w.last's monotonic clocks.
|
|
diff := now.Sub(w.last)
|
|
if diff < 0 {
|
|
// time went backwards somehow; zero events and return
|
|
for i := range w.events {
|
|
w.events[i] = nil
|
|
}
|
|
return
|
|
}
|
|
last := now.Add(-diff)
|
|
|
|
b := w.bucket(now)
|
|
p := w.bucket(last)
|
|
|
|
if b == p && diff <= w.res {
|
|
// We're in the same bucket as the previous sweep, so all buckets are
|
|
// valid.
|
|
return
|
|
}
|
|
|
|
if diff > w.res*time.Duration(len(w.events)) {
|
|
// We've gone longer than this window measures since the last sweep, just
|
|
// zero the thing and have done.
|
|
for i := range w.events {
|
|
w.events[i] = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// Expire all invalid buckets. This means buckets not seen since the
|
|
// previous sweep and now, including the current bucket but not including the
|
|
// previous bucket.
|
|
old := int64(last.UnixNano()) / int64(w.res)
|
|
new := int64(now.UnixNano()) / int64(w.res)
|
|
for i := old + 1; i <= new; i++ {
|
|
b := int(i) % len(w.events)
|
|
w.events[b] = nil
|
|
}
|
|
}
|
|
|
|
// Insert adds the given event.
|
|
func (w *Window) Insert(e interface{}) {
|
|
w.insertAt(time.Now(), e)
|
|
}
|
|
|
|
func (w *Window) insertAt(t time.Time, e interface{}) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.forever {
|
|
w.e = w.reduce(w.e, e)
|
|
return
|
|
}
|
|
|
|
w.sweep(t)
|
|
w.events[w.bucket(t)] = w.reduce(w.events[w.bucket(t)], e)
|
|
}
|
|
|
|
// Reduce runs the window's reducer over the valid values and returns the
|
|
// result.
|
|
func (w *Window) Reduce() interface{} {
|
|
return w.reducedAt(time.Now())
|
|
}
|
|
|
|
func (w *Window) reducedAt(t time.Time) interface{} {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.forever {
|
|
return w.e
|
|
}
|
|
|
|
w.sweep(t)
|
|
var n interface{}
|
|
for i := range w.events {
|
|
n = w.reduce(n, w.events[i])
|
|
}
|
|
return n
|
|
}
|