// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package bundler supports bundling (batching) of items. Bundling amortizes an
// action with fixed costs over multiple items. For example, if an API provides
// an RPC that accepts a list of items as input, but clients would prefer
// adding items one at a time, then a Bundler can accept individual items from
// the client and bundle many of them into a single RPC.
//
// This package is experimental and subject to change without notice.
package bundler

import (
	"errors"
	"math"
	"reflect"
	"sync"
	"time"

	"golang.org/x/net/context"
	"golang.org/x/sync/semaphore"
)

const (
	DefaultDelayThreshold       = time.Second
	DefaultBundleCountThreshold = 10
	DefaultBundleByteThreshold  = 1e6 // 1M
	DefaultBufferedByteLimit    = 1e9 // 1G
)

var (
	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
	ErrOverflow = errors.New("bundler reached buffered byte limit")

	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
)

// A Bundler collects items added to it into a bundle until the bundle
// exceeds a given size, then calls a user-provided function to handle the bundle.
type Bundler struct {
	// Starting from the time that the first message is added to a bundle, once
	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
	DelayThreshold time.Duration

	// Once a bundle has this many items, handle the bundle. Since only one
	// item at a time is added to a bundle, no bundle will exceed this
	// threshold, so it also serves as a limit. The default is
	// DefaultBundleCountThreshold.
	BundleCountThreshold int

	// Once the number of bytes in current bundle reaches this threshold, handle
	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
	// but does not cap the total size of a bundle.
	BundleByteThreshold int

	// The maximum size of a bundle, in bytes. Zero means unlimited.
	BundleByteLimit int

	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
	BufferedByteLimit int

	// The maximum number of handler invocations that can be running at once.
	// The default is 1.
	HandlerLimit int

	handler       func(interface{}) // called to handle a bundle
	itemSliceZero reflect.Value     // nil (zero value) for slice of items
	flushTimer    *time.Timer       // implements DelayThreshold

	mu        sync.Mutex
	sem       *semaphore.Weighted // enforces BufferedByteLimit
	semOnce   sync.Once
	curBundle bundle // incoming items added to this bundle

	// Each bundle is assigned a unique ticket that determines the order in which the
	// handler is called. The ticket is assigned with mu locked, but waiting for tickets
	// to be handled is done via mu2 and cond, below.
	nextTicket uint64 // next ticket to be assigned

	mu2         sync.Mutex
	cond        *sync.Cond
	nextHandled uint64 // next ticket to be handled

	// In this implementation, active uses space proportional to HandlerLimit, and
	// waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
	// or release occurs, so large values of HandlerLimit max may cause performance
	// issues.
	active map[uint64]bool // tickets of bundles actively being handled
}

type bundle struct {
	items reflect.Value // slice of item type
	size  int           // size in bytes of all items
}

// NewBundler creates a new Bundler.
//
// itemExample is a value of the type that will be bundled. For example, if you
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
//
// handler is a function that will be called on each bundle. If itemExample is
// of type T, the argument to handler is of type []T. handler is always called
// sequentially for each bundle, and never in parallel.
//
// Configure the Bundler by setting its thresholds and limits before calling
// any of its methods.
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
	b := &Bundler{
		DelayThreshold:       DefaultDelayThreshold,
		BundleCountThreshold: DefaultBundleCountThreshold,
		BundleByteThreshold:  DefaultBundleByteThreshold,
		BufferedByteLimit:    DefaultBufferedByteLimit,
		HandlerLimit:         1,

		handler:       handler,
		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
		active:        map[uint64]bool{},
	}
	b.curBundle.items = b.itemSliceZero
	b.cond = sync.NewCond(&b.mu2)
	return b
}

func (b *Bundler) initSemaphores() {
	// Create the semaphores lazily, because the user may set limits
	// after NewBundler.
	b.semOnce.Do(func() {
		b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
	})
}

// Add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. Add returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed
// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
// memory, Add returns ErrOverflow.
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
	// If this item exceeds the maximum size of a bundle,
	// we can never send it.
	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
		return ErrOversizedItem
	}
	// If adding this item would exceed our allotted memory
	// footprint, we can't accept it.
	// (TryAcquire also returns false if anything is waiting on the semaphore,
	// so calls to Add and AddWait shouldn't be mixed.)
	b.initSemaphores()
	if !b.sem.TryAcquire(int64(size)) {
		return ErrOverflow
	}
	b.add(item, size)
	return nil
}

// add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
func (b *Bundler) add(item interface{}, size int) {
	b.mu.Lock()
	defer b.mu.Unlock()

	// If adding this item to the current bundle would cause it to exceed the
	// maximum bundle size, close the current bundle and start a new one.
	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
		b.startFlushLocked()
	}
	// Add the item.
	b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
	b.curBundle.size += size

	// Start a timer to flush the item if one isn't already running.
	// startFlushLocked clears the timer and closes the bundle at the same time,
	// so we only allocate a new timer for the first item in each bundle.
	// (We could try to call Reset on the timer instead, but that would add a lot
	// of complexity to the code just to save one small allocation.)
	if b.flushTimer == nil {
		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush)
	}

	// If the current bundle equals the count threshold, close it.
	if b.curBundle.items.Len() == b.BundleCountThreshold {
		b.startFlushLocked()
	}
	// If the current bundle equals or exceeds the byte threshold, close it.
	if b.curBundle.size >= b.BundleByteThreshold {
		b.startFlushLocked()
	}
}

// AddWait adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. AddWait returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// AddWait blocks until space is available or ctx is done.
//
// Calls to Add and AddWait should not be mixed on the same Bundler.
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
	// If this item exceeds the maximum size of a bundle,
	// we can never send it.
	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
		return ErrOversizedItem
	}
	// If adding this item would exceed our allotted memory footprint, block
	// until space is available. The semaphore is FIFO, so there will be no
	// starvation.
	b.initSemaphores()
	if err := b.sem.Acquire(ctx, int64(size)); err != nil {
		return err
	}
	// Here, we've reserved space for item. Other goroutines can call AddWait
	// and even acquire space, but no one can take away our reservation
	// (assuming sem.Release is used correctly). So there is no race condition
	// resulting from locking the mutex after sem.Acquire returns.
	b.add(item, size)
	return nil
}

// Flush invokes the handler for all remaining items in the Bundler and waits
// for it to return.
func (b *Bundler) Flush() {
	b.mu.Lock()
	b.startFlushLocked()
	// Here, all bundles with tickets < b.nextTicket are
	// either finished or active. Those are the ones
	// we want to wait for.
	t := b.nextTicket
	b.mu.Unlock()
	b.initSemaphores()
	b.waitUntilAllHandled(t)
}

func (b *Bundler) startFlushLocked() {
	if b.flushTimer != nil {
		b.flushTimer.Stop()
		b.flushTimer = nil
	}
	if b.curBundle.items.Len() == 0 {
		return
	}
	// Here, both semaphores must have been initialized.
	bun := b.curBundle
	b.curBundle = bundle{items: b.itemSliceZero}
	ticket := b.nextTicket
	b.nextTicket++
	go func() {
		defer func() {
			b.sem.Release(int64(bun.size))
			b.release(ticket)
		}()
		b.acquire(ticket)
		b.handler(bun.items.Interface())
	}()
}

// acquire blocks until ticket is the next to be served, then returns. In order for N
// acquire calls to return, the tickets must be in the range [0, N). A ticket must
// not be presented to acquire more than once.
func (b *Bundler) acquire(ticket uint64) {
	b.mu2.Lock()
	defer b.mu2.Unlock()
	if ticket < b.nextHandled {
		panic("bundler: acquire: arg too small")
	}
	for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
		b.cond.Wait()
	}
	// Here,
	// ticket == b.nextHandled: the caller is the next one to be handled;
	// and len(b.active) < b.HandlerLimit: there is space available.
	b.active[ticket] = true
	b.nextHandled++
	// Broadcast, not Signal: although at most one acquire waiter can make progress,
	// there might be waiters in waitUntilAllHandled.
	b.cond.Broadcast()
}

// If a ticket is used for a call to acquire, it must later be passed to release. A
// ticket must not be presented to release more than once.
func (b *Bundler) release(ticket uint64) {
	b.mu2.Lock()
	defer b.mu2.Unlock()
	if !b.active[ticket] {
		panic("bundler: release: not an active ticket")
	}
	delete(b.active, ticket)
	b.cond.Broadcast()
}

// waitUntilAllHandled blocks until all tickets < n have called release, meaning
// all bundles with tickets < n have been handled.
func (b *Bundler) waitUntilAllHandled(n uint64) {
	// Proof of correctness of this function.
	// "N is acquired" means acquire(N) has returned.
	// "N is released" means release(N) has returned.
	// 1. If N is acquired, N-1 is acquired.
	//    Follows from the loop test in acquire, and the fact
	//    that nextHandled is incremented by 1.
	// 2. If nextHandled >= N, then N-1 is acquired.
	//    Because we only increment nextHandled to N after N-1 is acquired.
	// 3. If nextHandled >= N, then all n < N is acquired.
	//    Follows from #1 and #2.
	// 4. If N is acquired and N is not in active, then N is released.
	//    Because we put N in active before acquire returns, and only
	//    remove it when it is released.
	// Let min(active) be the smallest member of active, or infinity if active is empty.
	// 5. If nextHandled >= N and N <= min(active), then all n < N is released.
	//    From nextHandled >= N and #3, all n < N is acquired.
	//    N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
	//    So from #4, all n < N is released.
	// The loop test below is the antecedent of #5.
	b.mu2.Lock()
	defer b.mu2.Unlock()
	for !(b.nextHandled >= n && n <= min(b.active)) {
		b.cond.Wait()
	}
}

// min returns the minimum value of the set s, or the largest uint64 if
// s is empty.
func min(s map[uint64]bool) uint64 {
	var m uint64 = math.MaxUint64
	for n := range s {
		if n < m {
			m = n
		}
	}
	return m
}