diff --git a/fs/sync/pipe.go b/fs/sync/pipe.go index 60da19841..1e19e39c6 100644 --- a/fs/sync/pipe.go +++ b/fs/sync/pipe.go @@ -1,11 +1,11 @@ package sync import ( - "container/heap" "context" "strings" "sync" + "github.com/aalpar/deheap" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/fserrors" @@ -38,7 +38,7 @@ func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog less: less, } if p.less != nil { - heap.Init(p) + deheap.Init(p) } return p, nil } @@ -73,9 +73,6 @@ func (p *pipe) Pop() interface{} { return item } -// Check interface satisfied -var _ heap.Interface = (*pipe)(nil) - // Put an pair into the pipe // // It returns ok = false if the context was cancelled @@ -90,7 +87,7 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) { // no order-by p.queue = append(p.queue, pair) } else { - heap.Push(p, pair) + deheap.Push(p, pair) } size := pair.Src.Size() if size > 0 { @@ -129,7 +126,7 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { p.queue[0] = fs.ObjectPair{} // avoid memory leak p.queue = p.queue[1:] } else { - pair = heap.Pop(p).(fs.ObjectPair) + pair = deheap.Pop(p).(fs.ObjectPair) } size := pair.Src.Size() if size > 0 { diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go index 1ef34f27d..59506bcc1 100644 --- a/fs/sync/pipe_test.go +++ b/fs/sync/pipe_test.go @@ -1,6 +1,7 @@ package sync import ( + "container/heap" "context" "sync" "sync/atomic" @@ -12,6 +13,9 @@ import ( "github.com/stretchr/testify/require" ) +// Check interface satisfied +var _ heap.Interface = (*pipe)(nil) + func TestPipe(t *testing.T) { var queueLength int var queueSize int64