101 lines
1.9 KiB
Go
101 lines
1.9 KiB
Go
|
package sync
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/ncw/rclone/fs"
|
||
|
)
|
||
|
|
||
|
// pipe provides an unbounded channel like experience
|
||
|
//
|
||
|
// Note unlike channels these aren't strictly ordered.
|
||
|
type pipe struct {
|
||
|
mu sync.Mutex
|
||
|
c chan struct{}
|
||
|
queue []fs.ObjectPair
|
||
|
closed bool
|
||
|
totalSize int64
|
||
|
stats func(items int, totalSize int64)
|
||
|
}
|
||
|
|
||
|
func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe {
|
||
|
return &pipe{
|
||
|
c: make(chan struct{}, maxBacklog),
|
||
|
stats: stats,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Put an pair into the pipe
|
||
|
//
|
||
|
// It returns ok = false if the context was cancelled
|
||
|
//
|
||
|
// It will panic if you call it after Close()
|
||
|
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
||
|
if ctx.Err() != nil {
|
||
|
return false
|
||
|
}
|
||
|
p.mu.Lock()
|
||
|
p.queue = append(p.queue, pair)
|
||
|
size := pair.Src.Size()
|
||
|
if size > 0 {
|
||
|
p.totalSize += size
|
||
|
}
|
||
|
p.stats(len(p.queue), p.totalSize)
|
||
|
p.mu.Unlock()
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return false
|
||
|
case p.c <- struct{}{}:
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// Get a pair from the pipe
|
||
|
//
|
||
|
// It returns ok = false if the context was cancelled or Close() has
|
||
|
// been called.
|
||
|
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
|
||
|
if ctx.Err() != nil {
|
||
|
return
|
||
|
}
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
case _, ok = <-p.c:
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
p.mu.Lock()
|
||
|
pair, p.queue = p.queue[0], p.queue[1:]
|
||
|
size := pair.Src.Size()
|
||
|
if size > 0 {
|
||
|
p.totalSize -= size
|
||
|
}
|
||
|
if p.totalSize < 0 {
|
||
|
p.totalSize = 0
|
||
|
}
|
||
|
p.stats(len(p.queue), p.totalSize)
|
||
|
p.mu.Unlock()
|
||
|
return pair, true
|
||
|
}
|
||
|
|
||
|
// Stats reads the number of items in the queue and the totalSize
|
||
|
func (p *pipe) Stats() (items int, totalSize int64) {
|
||
|
p.mu.Lock()
|
||
|
items, totalSize = len(p.queue), p.totalSize
|
||
|
p.mu.Unlock()
|
||
|
return items, totalSize
|
||
|
}
|
||
|
|
||
|
// Close the pipe
|
||
|
//
|
||
|
// Writes to a closed pipe will panic as will double closing a pipe
|
||
|
func (p *pipe) Close() {
|
||
|
p.mu.Lock()
|
||
|
close(p.c)
|
||
|
p.closed = true
|
||
|
p.mu.Unlock()
|
||
|
}
|