forked from TrueCloudLab/rclone
160 lines
4 KiB
Go
160 lines
4 KiB
Go
|
// Copyright 2016 Google Inc. All Rights Reserved.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package pubsub
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/net/context"
|
||
|
)
|
||
|
|
||
|
// ackBuffer stores the pending ack IDs and notifies the Dirty channel when it becomes non-empty.
|
||
|
type ackBuffer struct {
|
||
|
Dirty chan struct{}
|
||
|
// Close done when ackBuffer is no longer needed.
|
||
|
Done chan struct{}
|
||
|
|
||
|
mu sync.Mutex
|
||
|
pending []string
|
||
|
send bool
|
||
|
}
|
||
|
|
||
|
// Add adds ackID to the buffer.
|
||
|
func (buf *ackBuffer) Add(ackID string) {
|
||
|
buf.mu.Lock()
|
||
|
defer buf.mu.Unlock()
|
||
|
buf.pending = append(buf.pending, ackID)
|
||
|
|
||
|
// If we are transitioning into a non-empty notification state.
|
||
|
if buf.send && len(buf.pending) == 1 {
|
||
|
buf.notify()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// RemoveAll removes all ackIDs from the buffer and returns them.
|
||
|
func (buf *ackBuffer) RemoveAll() []string {
|
||
|
buf.mu.Lock()
|
||
|
defer buf.mu.Unlock()
|
||
|
|
||
|
ret := buf.pending
|
||
|
buf.pending = nil
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
// SendNotifications enables sending dirty notification on empty -> non-empty transitions.
|
||
|
// If the buffer is already non-empty, a notification will be sent immediately.
|
||
|
func (buf *ackBuffer) SendNotifications() {
|
||
|
buf.mu.Lock()
|
||
|
defer buf.mu.Unlock()
|
||
|
|
||
|
buf.send = true
|
||
|
// If we are transitioning into a non-empty notification state.
|
||
|
if len(buf.pending) > 0 {
|
||
|
buf.notify()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (buf *ackBuffer) notify() {
|
||
|
go func() {
|
||
|
select {
|
||
|
case buf.Dirty <- struct{}{}:
|
||
|
case <-buf.Done:
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// acker acks messages in batches.
|
||
|
type acker struct {
|
||
|
s service
|
||
|
Ctx context.Context // The context to use when acknowledging messages.
|
||
|
Sub string // The full name of the subscription.
|
||
|
AckTick <-chan time.Time // AckTick supplies the frequency with which to make ack requests.
|
||
|
|
||
|
// Notify is called with an ack ID after the message with that ack ID
|
||
|
// has been processed. An ackID is considered to have been processed
|
||
|
// if at least one attempt has been made to acknowledge it.
|
||
|
Notify func(string)
|
||
|
|
||
|
ackBuffer
|
||
|
|
||
|
wg sync.WaitGroup
|
||
|
done chan struct{}
|
||
|
}
|
||
|
|
||
|
// Start intiates processing of ackIDs which are added via Add.
|
||
|
// Notify is called with each ackID once it has been processed.
|
||
|
func (a *acker) Start() {
|
||
|
a.done = make(chan struct{})
|
||
|
a.ackBuffer.Dirty = make(chan struct{})
|
||
|
a.ackBuffer.Done = a.done
|
||
|
|
||
|
a.wg.Add(1)
|
||
|
go func() {
|
||
|
defer a.wg.Done()
|
||
|
for {
|
||
|
select {
|
||
|
case <-a.ackBuffer.Dirty:
|
||
|
a.ack(a.ackBuffer.RemoveAll())
|
||
|
case <-a.AckTick:
|
||
|
a.ack(a.ackBuffer.RemoveAll())
|
||
|
case <-a.done:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// Ack adds an ack id to be acked in the next batch.
|
||
|
func (a *acker) Ack(ackID string) {
|
||
|
a.ackBuffer.Add(ackID)
|
||
|
}
|
||
|
|
||
|
// FastMode switches acker into a mode which acks messages as they arrive, rather than waiting
|
||
|
// for a.AckTick.
|
||
|
func (a *acker) FastMode() {
|
||
|
a.ackBuffer.SendNotifications()
|
||
|
}
|
||
|
|
||
|
// Stop drops all pending messages, and releases resources before returning.
|
||
|
func (a *acker) Stop() {
|
||
|
close(a.done)
|
||
|
a.wg.Wait()
|
||
|
}
|
||
|
|
||
|
const maxAckAttempts = 2
|
||
|
|
||
|
// ack acknowledges the supplied ackIDs.
|
||
|
// After the acknowledgement request has completed (regardless of its success
|
||
|
// or failure), ids will be passed to a.Notify.
|
||
|
func (a *acker) ack(ids []string) {
|
||
|
head, tail := a.s.splitAckIDs(ids)
|
||
|
for len(head) > 0 {
|
||
|
for i := 0; i < maxAckAttempts; i++ {
|
||
|
if a.s.acknowledge(a.Ctx, a.Sub, head) == nil {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// NOTE: if retry gives up and returns an error, we simply drop
|
||
|
// those ack IDs. The messages will be redelivered and this is
|
||
|
// a documented behaviour of the API.
|
||
|
head, tail = a.s.splitAckIDs(tail)
|
||
|
}
|
||
|
for _, id := range ids {
|
||
|
a.Notify(id)
|
||
|
}
|
||
|
}
|