forked from TrueCloudLab/rclone
c86a55c798
Before this change, if there was an existing file being uploaded when a file was renamed on top of it, then both would be uploaded. This causes a duplicate in Google Drive as both files get uploaded at the same time. This was triggered reliably by LibreOffice saving doc files. This fix removes any duplicates in the upload queue on rename.
664 lines
15 KiB
Go
664 lines
15 KiB
Go
package writeback
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/vfs/vfscommon"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
opt := vfscommon.DefaultOpt
|
|
opt.WriteBack = 100 * time.Millisecond
|
|
wb = New(ctx, &opt)
|
|
return wb, cancel
|
|
}
|
|
|
|
// string for debugging - make a copy and pop the items out in order
|
|
func (wb *WriteBack) string(t *testing.T) string {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
ws := wb.items
|
|
|
|
// check indexes OK first
|
|
for i := range ws {
|
|
assert.Equal(t, i, ws[i].index, ws[i].name)
|
|
}
|
|
wsCopy := make(writeBackItems, len(ws))
|
|
// deep copy the elements
|
|
for i := range wsCopy {
|
|
item := *ws[i]
|
|
wsCopy[i] = &item
|
|
}
|
|
// print them
|
|
var out []string
|
|
for wsCopy.Len() > 0 {
|
|
out = append(out, heap.Pop(&wsCopy).(*writeBackItem).name)
|
|
}
|
|
return strings.Join(out, ",")
|
|
}
|
|
|
|
func TestWriteBackItems(t *testing.T) {
|
|
// Test the items heap behaves properly
|
|
now := time.Now()
|
|
wbItem1 := writeBackItem{name: "one", expiry: now.Add(1 * time.Second)}
|
|
wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)}
|
|
wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)}
|
|
|
|
wb := &WriteBack{
|
|
items: writeBackItems{},
|
|
}
|
|
|
|
heap.Init(&wb.items)
|
|
assert.Equal(t, "", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem2)
|
|
assert.Equal(t, "two", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem3)
|
|
assert.Equal(t, "two,three", wb.string(t))
|
|
heap.Push(&wb.items, &wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
wb.items._update(&wbItem1, now.Add(3*time.Second))
|
|
assert.Equal(t, "two,one,three", wb.string(t))
|
|
|
|
wb.items._update(&wbItem1, now.Add(5*time.Second))
|
|
assert.Equal(t, "two,three,one", wb.string(t))
|
|
|
|
// Set all times the same - should sort in insertion order
|
|
wb.items._update(&wbItem1, now)
|
|
wb.items._update(&wbItem2, now)
|
|
wb.items._update(&wbItem3, now)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
}
|
|
|
|
func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.True(t, wbItem.onHeap)
|
|
for i := range wb.items {
|
|
if wb.items[i] == wbItem {
|
|
return
|
|
}
|
|
}
|
|
assert.Failf(t, "expecting %q on heap", wbItem.name)
|
|
}
|
|
|
|
func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.False(t, wbItem.onHeap)
|
|
for i := range wb.items {
|
|
if wb.items[i] == wbItem {
|
|
t.Errorf("not expecting %q on heap", wbItem.name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.Equal(t, wbItem, wb.lookup[wbItem.id])
|
|
}
|
|
|
|
func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
assert.Nil(t, wb.lookup[wbItem.id])
|
|
}
|
|
|
|
func TestWriteBackItemCRUD(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// _peekItem empty
|
|
assert.Nil(t, wb._peekItem())
|
|
|
|
wbItem1 := wb._newItem(0, "one")
|
|
checkOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
|
|
wbItem2 := wb._newItem(0, "two")
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
|
|
wbItem3 := wb._newItem(0, "three")
|
|
checkOnHeap(t, wb, wbItem3)
|
|
checkInLookup(t, wb, wbItem3)
|
|
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
// _delItem
|
|
wb._delItem(wbItem2)
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkNotInLookup(t, wb, wbItem2)
|
|
|
|
// _addItem
|
|
wb._addItem(wbItem2)
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
|
|
// _popItem
|
|
assert.True(t, wbItem1.onHeap)
|
|
poppedWbItem := wb._popItem()
|
|
assert.Equal(t, wbItem1, poppedWbItem)
|
|
checkNotOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
assert.Equal(t, "two,three", wb.string(t))
|
|
|
|
// _pushItem
|
|
wb._pushItem(wbItem1)
|
|
checkOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
// push twice
|
|
wb._pushItem(wbItem1)
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
|
|
// _peekItem
|
|
assert.Equal(t, wbItem1, wb._peekItem())
|
|
|
|
// _removeItem
|
|
assert.Equal(t, "one,two,three", wb.string(t))
|
|
wb._removeItem(wbItem2)
|
|
checkNotOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.Equal(t, "one,three", wb.string(t))
|
|
// remove twice
|
|
wb._removeItem(wbItem2)
|
|
checkNotOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.Equal(t, "one,three", wb.string(t))
|
|
}
|
|
|
|
func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) {
|
|
wb.mu.Lock()
|
|
if running {
|
|
assert.NotEqual(t, time.Time{}, wb.expiry)
|
|
assert.NotNil(t, wb.timer)
|
|
} else {
|
|
assert.Equal(t, time.Time{}, wb.expiry)
|
|
assert.Nil(t, wb.timer)
|
|
}
|
|
wb.mu.Unlock()
|
|
}
|
|
|
|
func TestWriteBackResetTimer(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// Reset the timer on an empty queue
|
|
wb._resetTimer()
|
|
|
|
// Check timer is stopped
|
|
assertTimerRunning(t, wb, false)
|
|
|
|
_ = wb._newItem(0, "three")
|
|
|
|
// Reset the timer on an queue with stuff
|
|
wb._resetTimer()
|
|
|
|
// Check timer is not stopped
|
|
assertTimerRunning(t, wb, true)
|
|
}
|
|
|
|
// A "transfer" for testing
|
|
type putItem struct {
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
t *testing.T
|
|
started chan struct{}
|
|
errChan chan error
|
|
running bool
|
|
cancelled bool
|
|
called bool
|
|
}
|
|
|
|
func newPutItem(t *testing.T) *putItem {
|
|
return &putItem{
|
|
t: t,
|
|
started: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// put the object as per PutFn interface
|
|
func (pi *putItem) put(ctx context.Context) (err error) {
|
|
pi.wg.Add(1)
|
|
defer pi.wg.Done()
|
|
|
|
pi.mu.Lock()
|
|
pi.called = true
|
|
if pi.running {
|
|
assert.Fail(pi.t, "upload already running")
|
|
}
|
|
pi.running = true
|
|
pi.errChan = make(chan error, 1)
|
|
pi.mu.Unlock()
|
|
|
|
pi.started <- struct{}{}
|
|
|
|
cancelled := false
|
|
select {
|
|
case err = <-pi.errChan:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
cancelled = true
|
|
}
|
|
|
|
pi.mu.Lock()
|
|
pi.running = false
|
|
pi.cancelled = cancelled
|
|
pi.mu.Unlock()
|
|
|
|
return err
|
|
}
|
|
|
|
// finish the "transfer" with the error passed in
|
|
func (pi *putItem) finish(err error) {
|
|
pi.mu.Lock()
|
|
if !pi.running {
|
|
assert.Fail(pi.t, "upload not running")
|
|
}
|
|
pi.mu.Unlock()
|
|
|
|
pi.errChan <- err
|
|
pi.wg.Wait()
|
|
}
|
|
|
|
func waitUntilNoTransfers(t *testing.T, wb *WriteBack) {
|
|
for i := 0; i < 100; i++ {
|
|
wb.mu.Lock()
|
|
uploads := wb.uploads
|
|
wb.mu.Unlock()
|
|
if uploads == 0 {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
t.Errorf("failed to wait for transfer to finish")
|
|
}
|
|
|
|
// This is the happy path with everything working
|
|
func TestWriteBackAddOK(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
var inID Handle
|
|
wb.SetID(&inID)
|
|
assert.Equal(t, Handle(1), inID)
|
|
|
|
id := wb.Add(inID, "one", true, pi.put)
|
|
assert.Equal(t, inID, id)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload failing and being retried
|
|
func TestWriteBackAddFailRetry(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(errors.New("transfer failed BOOM"))
|
|
waitUntilNoTransfers(t, wb)
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// check the retry
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload being cancelled by another upload being added
|
|
func TestWriteBackAddUpdate(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Now the upload has started add another one
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", true, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// check the previous transfer was cancelled
|
|
assert.True(t, pi.cancelled)
|
|
|
|
// check the retry
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
pi2.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
}
|
|
|
|
// Now test the upload being not cancelled by another upload being added
|
|
func TestWriteBackAddUpdateNotModified(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", false, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Now the upload has started add another one
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", false, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkNotOnHeap(t, wb, wbItem) // object still being transfered
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Because modified was false above this should not cancel the
|
|
// transfer
|
|
assert.False(t, pi.cancelled)
|
|
|
|
// wait for original transfer to finish
|
|
pi.finish(nil)
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
|
|
assert.False(t, pi2.called)
|
|
}
|
|
|
|
// Now test the upload being not cancelled by another upload being
|
|
// added because the upload hasn't started yet
|
|
func TestWriteBackAddUpdateNotStarted(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, "one", wb.string(t))
|
|
|
|
// Immediately add another upload before the first has started
|
|
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(id, "one", true, pi2.put)
|
|
assert.Equal(t, id, id2)
|
|
checkOnHeap(t, wb, wbItem) // object still awaiting transfer
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Wait for the upload to start
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// Because modified was false above this should not cancel the
|
|
// transfer
|
|
assert.False(t, pi.cancelled)
|
|
|
|
// wait for new transfer to finish
|
|
pi2.finish(nil)
|
|
waitUntilNoTransfers(t, wb)
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkNotInLookup(t, wb, wbItem)
|
|
|
|
assert.False(t, pi.called)
|
|
}
|
|
|
|
func TestWriteBackGetStats(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
pi := newPutItem(t)
|
|
|
|
wb.Add(0, "one", true, pi.put)
|
|
|
|
inProgress, queued := wb.Stats()
|
|
assert.Equal(t, queued, 1)
|
|
assert.Equal(t, inProgress, 0)
|
|
|
|
<-pi.started
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 1)
|
|
|
|
pi.finish(nil) // transfer successful
|
|
waitUntilNoTransfers(t, wb)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 0)
|
|
|
|
}
|
|
|
|
// Test queuing more than fs.Config.Transfers
|
|
func TestWriteBackMaxQueue(t *testing.T) {
|
|
ctx := context.Background()
|
|
ci := fs.GetConfig(ctx)
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
maxTransfers := ci.Transfers
|
|
toTransfer := maxTransfers + 2
|
|
|
|
// put toTransfer things in the queue
|
|
pis := []*putItem{}
|
|
for i := 0; i < toTransfer; i++ {
|
|
pi := newPutItem(t)
|
|
pis = append(pis, pi)
|
|
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
|
|
}
|
|
|
|
inProgress, queued := wb.Stats()
|
|
assert.Equal(t, toTransfer, queued)
|
|
assert.Equal(t, 0, inProgress)
|
|
|
|
// now start the first maxTransfers - this should stop the timer
|
|
for i := 0; i < maxTransfers; i++ {
|
|
<-pis[i].started
|
|
}
|
|
|
|
// timer should be stopped now
|
|
assertTimerRunning(t, wb, false)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, toTransfer-maxTransfers, queued)
|
|
assert.Equal(t, maxTransfers, inProgress)
|
|
|
|
// now finish the the first maxTransfers
|
|
for i := 0; i < maxTransfers; i++ {
|
|
pis[i].finish(nil)
|
|
}
|
|
|
|
// now start and finish the remaining transfers one at a time
|
|
for i := maxTransfers; i < toTransfer; i++ {
|
|
<-pis[i].started
|
|
pis[i].finish(nil)
|
|
}
|
|
waitUntilNoTransfers(t, wb)
|
|
|
|
inProgress, queued = wb.Stats()
|
|
assert.Equal(t, queued, 0)
|
|
assert.Equal(t, inProgress, 0)
|
|
}
|
|
|
|
func TestWriteBackRename(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// cancel when not in writeback
|
|
wb.Rename(1, "nonExistent")
|
|
|
|
// add item
|
|
pi1 := newPutItem(t)
|
|
id := wb.Add(0, "one", true, pi1.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, wbItem.name, "one")
|
|
|
|
// rename when not uploading
|
|
wb.Rename(id, "two")
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.False(t, pi1.cancelled)
|
|
assert.Equal(t, wbItem.name, "two")
|
|
|
|
// add item
|
|
pi2 := newPutItem(t)
|
|
id = wb.Add(id, "two", true, pi2.put)
|
|
wbItem = wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.Equal(t, wbItem.name, "two")
|
|
|
|
// wait for upload to start
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// rename when uploading - goes back on heap
|
|
wb.Rename(id, "three")
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.True(t, pi2.cancelled)
|
|
assert.Equal(t, wbItem.name, "three")
|
|
}
|
|
|
|
// TestWriteBackRenameDuplicates checks that if we rename an entry and
|
|
// make a duplicate, we remove the duplicate.
|
|
func TestWriteBackRenameDuplicates(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// add item "one"
|
|
pi1 := newPutItem(t)
|
|
id1 := wb.Add(0, "one", true, pi1.put)
|
|
wbItem1 := wb.lookup[id1]
|
|
checkOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
assert.Equal(t, wbItem1.name, "one")
|
|
|
|
<-pi1.started
|
|
checkNotOnHeap(t, wb, wbItem1)
|
|
checkInLookup(t, wb, wbItem1)
|
|
|
|
// add item "two"
|
|
pi2 := newPutItem(t)
|
|
id2 := wb.Add(0, "two", true, pi2.put)
|
|
wbItem2 := wb.lookup[id2]
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.Equal(t, wbItem2.name, "two")
|
|
|
|
<-pi2.started
|
|
checkNotOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
|
|
// rename "two" to "one"
|
|
wb.Rename(id2, "one")
|
|
|
|
// check "one" is cancelled and removed from heap and lookup
|
|
checkNotOnHeap(t, wb, wbItem1)
|
|
checkNotInLookup(t, wb, wbItem1)
|
|
assert.True(t, pi1.cancelled)
|
|
assert.Equal(t, wbItem1.name, "one")
|
|
|
|
// check "two" (now called "one"!) has been cancelled and will
|
|
// be retried
|
|
checkOnHeap(t, wb, wbItem2)
|
|
checkInLookup(t, wb, wbItem2)
|
|
assert.True(t, pi2.cancelled)
|
|
assert.Equal(t, wbItem2.name, "one")
|
|
}
|
|
|
|
func TestWriteBackCancelUpload(t *testing.T) {
|
|
wb, cancel := newTestWriteBack(t)
|
|
defer cancel()
|
|
|
|
// cancel when not in writeback
|
|
assert.False(t, wb.cancelUpload(1))
|
|
|
|
// add item
|
|
pi := newPutItem(t)
|
|
id := wb.Add(0, "one", true, pi.put)
|
|
wbItem := wb.lookup[id]
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// cancel when not uploading
|
|
assert.False(t, wb.cancelUpload(id))
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// wait for upload to start
|
|
<-pi.started
|
|
checkNotOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
|
|
// cancel when uploading
|
|
assert.True(t, wb.cancelUpload(id))
|
|
checkOnHeap(t, wb, wbItem)
|
|
checkInLookup(t, wb, wbItem)
|
|
assert.True(t, pi.cancelled)
|
|
}
|