615 lines
14 KiB
Go
615 lines
14 KiB
Go
package writeback
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/vfs/vfscommon"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
opt := vfscommon.DefaultOpt
|
|
opt.WriteBack = 100 * time.Millisecond
|
|
wb = New(ctx, &opt)
|
|
return wb, cancel
|
|
}
|
|
|
|
// string for debugging - make a copy and pop the items out in order
|
|
func (wb *WriteBack) string(t *testing.T) string {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
ws := wb.items
|
|
|
|
// check indexes OK first
|
|
for i := range ws {
|
|
assert.Equal(t, i, ws[i].index, ws[i].name)
|
|
}
|
|
wsCopy := make(writeBackItems, len(ws))
|
|
// deep copy the elements
|
|
for i := range wsCopy {
|
|
item := *ws[i]
|
|
wsCopy[i] = &item
|
|
}
|
|
// print them
|
|
var out []string
|
|
for wsCopy.Len() > 0 {
|
|
out = append(out, heap.Pop(&wsCopy).(*writeBackItem).name)
|
|
}
|
|
return strings.Join(out, ",")
|
|
}
|
|
|
|
func TestWriteBackItems(t *testing.T) {
|
|
// Test the items heap behaves properly
|
|
now := time.Now()
|
|
wbItem1 := writeBackItem{name: "one", expiry: now.Add(1 * time.Second)}
|
|
wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)}
|
|
wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)}
|
|
|
|
wb := &WriteBack{
|
|
items: writeBackItems{},
|
|
}
|
|
|
|
heap.Init(&wb.items)
|
|
assert.Equal(t, "", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem2)
|
|
assert.Equal(t, "two", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem3)
|
|
assert.Equal(t, "two,three", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
wb.items._update(&wbItem1, now.Add(3*time.Second))
|
|
assert.Equal(t, "two,one,three", wb.string(t))
|
|
|
|
wb.items._update(&wbItem1, now.Add(5*time.Second))
|
|
assert.Equal(t, "two,three,one", wb.string(t))
|
|
|
|
// Set all times the same - should sort in insertion order
|
|
wb.items._update(&wbItem1, now)
|
|
wb.items._update(&wbItem2, now)
|
|
wb.items._update(&wbItem3, now)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
}
|
|
|
|
func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.True(t, wbItem.onHeap)
|
|
for i := range wb.items {
|
|
if wb.items[i] == wbItem {
|
|
return
|
|
}
|
|
}
|
|
assert.Failf(t, "expecting %q on heap", wbItem.name)
|
|
}
|
|
|
|
func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.False(t, wbItem.onHeap)
|
|
for i := range wb.items {
|
|
if wb.items[i] == wbItem {
|
|
t.Errorf("not expecting %q on heap", wbItem.name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.Equal(t, wbItem, wb.lookup[wbItem.id])
|
|
}
|
|
|
|
func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.Nil(t, wb.lookup[wbItem.id])
|
|
}
|
|
|
|
func TestWriteBackItemCRUD(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// _peekItem empty
|
|
assert.Nil(t, wb._peekItem())
|
|
|
|
wbItem1 := wb._newItem(0, "one")
|
|
checkOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
|
|
wbItem2 := wb._newItem(0, "two")
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
|
|
wbItem3 := wb._newItem(0, "three")
|
|
checkOnHeap(t, wb, wbItem3)
|
|
checkInLookup(t, wb, wbItem3)
|
|
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
// _delItem
|
|
wb._delItem(wbItem2)
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkNotInLookup(t, wb, wbItem2)
|
|
|
|
// _addItem
|
|
wb._addItem(wbItem2)
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
|
|
// _popItem
|
|
assert.True(t, wbItem1.onHeap)
|
|
poppedWbItem := wb._popItem()
|
|
assert.Equal(t, wbItem1, poppedWbItem)
|
|
checkNotOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
assert.Equal(t, "two,three", wb.string(t))
|
|
|
|
// _pushItem
|
|
wb._pushItem(wbItem1)
|
|
checkOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
// push twice
|
|
wb._pushItem(wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
// _peekItem
|
|
assert.Equal(t, wbItem1, wb._peekItem())
|
|
|
|
// _removeItem
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
wb._removeItem(wbItem2)
|
|
checkNotOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.Equal(t, "one,three", wb.string(t))
|
|
// remove twice
|
|
wb._removeItem(wbItem2)
|
|
checkNotOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.Equal(t, "one,three", wb.string(t))
|
|
}
|
|
|
|
func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) {
|
|
wb.mu.Lock()
|
|
if running {
|
|
assert.NotEqual(t, time.Time{}, wb.expiry)
|
|
assert.NotNil(t, wb.timer)
|
|
} else {
|
|
assert.Equal(t, time.Time{}, wb.expiry)
|
|
assert.Nil(t, wb.timer)
|
|
}
|
|
wb.mu.Unlock()
|
|
}
|
|
|
|
func TestWriteBackResetTimer(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// Reset the timer on an empty queue
|
|
wb._resetTimer()
|
|
|
|
// Check timer is stopped
|
|
assertTimerRunning(t, wb, false)
|
|
|
|
_ = wb._newItem(0, "three")
|
|
|
|
// Reset the timer on an queue with stuff
|
|
wb._resetTimer()
|
|
|
|
// Check timer is not stopped
|
|
assertTimerRunning(t, wb, true)
|
|
}
|
|
|
|
// A "transfer" for testing
|
|
type putItem struct {
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
t *testing.T
|
|
started chan struct{}
|
|
errChan chan error
|
|
running bool
|
|
cancelled bool
|
|
called bool
|
|
}
|
|
|
|
func newPutItem(t *testing.T) *putItem {
|
|
return &putItem{
|
|
t: t,
|
|
started: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// put the object as per PutFn interface
|
|
func (pi *putItem) put(ctx context.Context) (err error) {
|
|
pi.wg.Add(1)
|
|
defer pi.wg.Done()
|
|
|
|
pi.mu.Lock()
|
|
pi.called = true
|
|
if pi.running {
|
|
assert.Fail(pi.t, "upload already running")
|
|
}
|
|
pi.running = true
|
|
pi.errChan = make(chan error, 1)
|
|
pi.mu.Unlock()
|
|
|
|
pi.started <- struct{}{}
|
|
|
|
cancelled := false
|
|
select {
|
|
case err = <-pi.errChan:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
cancelled = true
|
|
}
|
|
|
|
pi.mu.Lock()
|
|
pi.running = false
|
|
pi.cancelled = cancelled
|
|
pi.mu.Unlock()
|
|
|
|
return err
|
|
}
|
|
|
|
// finish the "transfer" with the error passed in
|
|
func (pi *putItem) finish(err error) {
|
|
pi.mu.Lock()
|
|
if !pi.running {
|
|
assert.Fail(pi.t, "upload not running")
|
|
}
|
|
pi.mu.Unlock()
|
|
|
|
pi.errChan <- err
|
|
pi.wg.Wait()
|
|
}
|
|
|
|
func waitUntilNoTransfers(t *testing.T, wb *WriteBack) {
|
|
for i := 0; i < 100; i++ {
|
|
wb.mu.Lock()
|
|
uploads := wb.uploads
|
|
wb.mu.Unlock()
|
|
if uploads == 0 {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
t.Errorf("failed to wait for transfer to finish")
|
|
}
|
|
|
|
// This is the happy path with everything working
|
|
func TestWriteBackAddOK(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
var inID Handle
|
|
wb.SetID(&inID)
|
|
assert.Equal(t, Handle(1), inID)
|
|
|
|
id := wb.Add(inID, "one", true, pi.put)
|
|
assert.Equal(t, inID, id)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload failing and being retried
|
|
func TestWriteBackAddFailRetry(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(errors.New("transfer failed BOOM"))
|
|
waitUntilNoTransfers(t, wb)
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// check the retry
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload being cancelled by another upload being added
|
|
func TestWriteBackAddUpdate(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Now the upload has started add another one
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", true, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// check the previous transfer was cancelled
|
|
assert.True(t, pi.cancelled)
|
|
|
|
// check the retry
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi2.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload being not cancelled by another upload being added
|
|
func TestWriteBackAddUpdateNotModified(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", false, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Now the upload has started add another one
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", false, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkNotOnHeap(t, wb, wbItem) // object still being transfered
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Because modified was false above this should not cancel the
|
|
// transfer
|
|
assert.False(t, pi.cancelled)
|
|
|
|
// wait for original transfer to finish
|
|
pi.finish(nil)
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
|
|
assert.False(t, pi2.called)
|
|
}
|
|
|
|
// Now test the upload being not cancelled by another upload being
|
|
// added because the upload hasn't started yet
|
|
func TestWriteBackAddUpdateNotStarted(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
// Immediately add another upload before the first has started
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", true, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkOnHeap(t, wb, wbItem) // object still awaiting transfer
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Wait for the upload to start
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Because modified was false above this should not cancel the
|
|
// transfer
|
|
assert.False(t, pi.cancelled)
|
|
|
|
// wait for new transfer to finish
|
|
pi2.finish(nil)
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
|
|
assert.False(t, pi.called)
|
|
}
|
|
|
|
func TestWriteBackGetStats(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
wb.Add(0, "one", true, pi.put)
|
|
|
|
inProgress, queued := wb.Stats()
|
|
assert.Equal(t, queued, 1)
|
|
assert.Equal(t, inProgress, 0)
|
|
|
|
<-pi.started
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 1)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 0)
|
|
|
|
}
|
|
|
|
// Test queuing more than fs.Config.Transfers
|
|
func TestWriteBackMaxQueue(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
maxTransfers := fs.Config.Transfers
|
|
toTransfer := maxTransfers + 2
|
|
|
|
// put toTransfer things in the queue
|
|
pis := []*putItem{}
|
|
for i := 0; i < toTransfer; i++ {
|
|
pi := newPutItem(t)
|
|
pis = append(pis, pi)
|
|
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
|
|
}
|
|
|
|
inProgress, queued := wb.Stats()
|
|
assert.Equal(t, toTransfer, queued)
|
|
assert.Equal(t, 0, inProgress)
|
|
|
|
// now start the first maxTransfers - this should stop the timer
|
|
for i := 0; i < maxTransfers; i++ {
|
|
<-pis[i].started
|
|
}
|
|
|
|
// timer should be stopped now
|
|
assertTimerRunning(t, wb, false)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, toTransfer-maxTransfers, queued)
|
|
assert.Equal(t, maxTransfers, inProgress)
|
|
|
|
// now finish the the first maxTransfers
|
|
for i := 0; i < maxTransfers; i++ {
|
|
pis[i].finish(nil)
|
|
}
|
|
|
|
// now start and finish the remaining transfers one at a time
|
|
for i := maxTransfers; i < toTransfer; i++ {
|
|
<-pis[i].started
|
|
pis[i].finish(nil)
|
|
}
|
|
waitUntilNoTransfers(t, wb)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 0)
|
|
}
|
|
|
|
func TestWriteBackRename(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// cancel when not in writeback
|
|
wb.Rename(1, "nonExistent")
|
|
|
|
// add item
|
|
pi1 := newPutItem(t)
|
|
id := wb.Add(0, "one", true, pi1.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, wbItem.name, "one")
|
|
|
|
// rename when not uploading
|
|
wb.Rename(id, "two")
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.False(t, pi1.cancelled)
|
|
assert.Equal(t, wbItem.name, "two")
|
|
|
|
// add item
|
|
pi2 := newPutItem(t)
|
|
id = wb.Add(id, "two", true, pi2.put)
|
|
wbItem = wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, wbItem.name, "two")
|
|
|
|
// wait for upload to start
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// rename when uploading - goes back on heap
|
|
wb.Rename(id, "three")
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.True(t, pi2.cancelled)
|
|
assert.Equal(t, wbItem.name, "three")
|
|
}
|
|
|
|
func TestWriteBackCancelUpload(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// cancel when not in writeback
|
|
assert.False(t, wb.cancelUpload(1))
|
|
|
|
// add item
|
|
pi := newPutItem(t)
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// cancel when not uploading
|
|
assert.False(t, wb.cancelUpload(id))
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// wait for upload to start
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// cancel when uploading
|
|
assert.True(t, wb.cancelUpload(id))
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.True(t, pi.cancelled)
|
|
}
|