cb7a461287
--max-backlog controls the queue length. Add statistics for the check/upload/rename queues. This means that checking can complete before the uploads which will give rclone the ability to show exactly what is outstanding.
100 lines
1.9 KiB
Go
100 lines
1.9 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/ncw/rclone/fs"
|
|
)
|
|
|
|
// pipe provides an unbounded channel like experience
|
|
//
|
|
// Note unlike channels these aren't strictly ordered.
|
|
type pipe struct {
|
|
mu sync.Mutex
|
|
c chan struct{}
|
|
queue []fs.ObjectPair
|
|
closed bool
|
|
totalSize int64
|
|
stats func(items int, totalSize int64)
|
|
}
|
|
|
|
func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe {
|
|
return &pipe{
|
|
c: make(chan struct{}, maxBacklog),
|
|
stats: stats,
|
|
}
|
|
}
|
|
|
|
// Put an pair into the pipe
|
|
//
|
|
// It returns ok = false if the context was cancelled
|
|
//
|
|
// It will panic if you call it after Close()
|
|
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
|
if ctx.Err() != nil {
|
|
return false
|
|
}
|
|
p.mu.Lock()
|
|
p.queue = append(p.queue, pair)
|
|
size := pair.Src.Size()
|
|
if size > 0 {
|
|
p.totalSize += size
|
|
}
|
|
p.stats(len(p.queue), p.totalSize)
|
|
p.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case p.c <- struct{}{}:
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Get a pair from the pipe
|
|
//
|
|
// It returns ok = false if the context was cancelled or Close() has
|
|
// been called.
|
|
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case _, ok = <-p.c:
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
p.mu.Lock()
|
|
pair, p.queue = p.queue[0], p.queue[1:]
|
|
size := pair.Src.Size()
|
|
if size > 0 {
|
|
p.totalSize -= size
|
|
}
|
|
if p.totalSize < 0 {
|
|
p.totalSize = 0
|
|
}
|
|
p.stats(len(p.queue), p.totalSize)
|
|
p.mu.Unlock()
|
|
return pair, true
|
|
}
|
|
|
|
// Stats reads the number of items in the queue and the totalSize
|
|
func (p *pipe) Stats() (items int, totalSize int64) {
|
|
p.mu.Lock()
|
|
items, totalSize = len(p.queue), p.totalSize
|
|
p.mu.Unlock()
|
|
return items, totalSize
|
|
}
|
|
|
|
// Close the pipe
|
|
//
|
|
// Writes to a closed pipe will panic as will double closing a pipe
|
|
func (p *pipe) Close() {
|
|
p.mu.Lock()
|
|
close(p.c)
|
|
p.closed = true
|
|
p.mu.Unlock()
|
|
}
|