forked from TrueCloudLab/rclone
Compare commits
3 commits
tcl/master
...
fix-vfs-qu
Author | SHA1 | Date | |
---|---|---|---|
|
b21ec3db7d | ||
|
8df2c9438f | ||
|
5c54531d04 |
6 changed files with 339 additions and 25 deletions
109
vfs/rc.go
109
vfs/rc.go
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/rc"
|
||||
"github.com/rclone/rclone/vfs/vfscache/writeback"
|
||||
)
|
||||
|
||||
const getVFSHelp = `
|
||||
|
@ -437,3 +438,111 @@ func rcStats(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
|||
}
|
||||
return vfs.Stats(), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "vfs/queue",
|
||||
Title: "Queue info for a VFS.",
|
||||
Help: strings.ReplaceAll(`
|
||||
This returns info about the upload queue for the selected VFS.
|
||||
|
||||
This is only useful if |--vfs-cache-mode| > off. If you call it when
|
||||
the |--vfs-cache-mode| is off, it will return an empty result.
|
||||
|
||||
{
|
||||
"queued": // an array of files queued for upload
|
||||
[
|
||||
{
|
||||
"name": "file", // string: name (full path) of the file,
|
||||
"id": 123, // integer: id of this item in the queue,
|
||||
"size": 79, // integer: size of the file in bytes
|
||||
"expiry": 1.5 // float: time until file is eligible for transfer, lowest goes first
|
||||
"tries": 1, // integer: number of times we have tried to upload
|
||||
"delay": 5.0, // float: seconds between upload attempts
|
||||
"uploading": false, // boolean: true if item is being uploaded
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
The |expiry| time is the time until the file is elegible for being
|
||||
uploaded in floating point seconds. This may go negative. As rclone
|
||||
only transfers |--transfers| files at once, only the lowest
|
||||
|--transfers| expiry times will have |uploading| as |true|. So there
|
||||
may be files with negative expiry times for which |uploading| is
|
||||
|false|.
|
||||
|
||||
`, "|", "`") + getVFSHelp,
|
||||
Fn: rcQueue,
|
||||
})
|
||||
}
|
||||
|
||||
func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
vfs, err := getVFS(in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if vfs.cache == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return vfs.cache.Queue(), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
rc.Add(rc.Call{
|
||||
Path: "vfs/queue-set-expiry",
|
||||
Title: "Set the expiry time for an item queued for upload.",
|
||||
Help: strings.ReplaceAll(`
|
||||
|
||||
Use this to adjust the |expiry| time for an item in the upload queue.
|
||||
You will need to read the |id| of the item using |vfs/queue| before
|
||||
using this call.
|
||||
|
||||
You can then set |expiry| to a floating point number of seconds from
|
||||
now when the item is eligible for upload. If you want the item to be
|
||||
uploaded as soon as possible then set it to a large negative number (eg
|
||||
-1000000000). If you want the upload of the item to be delayed
|
||||
for a long time then set it to a large positive number.
|
||||
|
||||
Setting the |expiry| of an item which has already has started uploading
|
||||
will have no effect - the item will carry on being uploaded.
|
||||
|
||||
This will return an error if called with |--vfs-cache-mode| off or if
|
||||
the |id| passed is not found.
|
||||
|
||||
This takes the following parameters
|
||||
|
||||
- |fs| - select the VFS in use (optional)
|
||||
- |id| - a numeric ID as returned from |vfs/queue|
|
||||
- |expiry| - a new expiry time as floating point seconds
|
||||
|
||||
This returns an empty result on success, or an error.
|
||||
|
||||
`, "|", "`") + getVFSHelp,
|
||||
Fn: rcQueueSetExpiry,
|
||||
})
|
||||
}
|
||||
|
||||
func rcQueueSetExpiry(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||
vfs, err := getVFS(in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if vfs.cache == nil {
|
||||
return nil, rc.NewErrParamInvalid(errors.New("can't call this unless using the VFS cache"))
|
||||
}
|
||||
|
||||
// Read input values
|
||||
id, err := in.GetInt64("id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expiry, err := in.GetFloat64("expiry")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set expiry
|
||||
expiryTime := time.Now().Add(time.Duration(float64(time.Second) * expiry))
|
||||
err = vfs.cache.QueueSetExpiry(writeback.Handle(id), expiryTime)
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -170,6 +170,18 @@ func (c *Cache) Stats() (out rc.Params) {
|
|||
return out
|
||||
}
|
||||
|
||||
// Queue returns info about the Cache
|
||||
func (c *Cache) Queue() (out rc.Params) {
|
||||
out = make(rc.Params)
|
||||
out["queue"] = c.writeback.Queue()
|
||||
return out
|
||||
}
|
||||
|
||||
// QueueSetExpiry updates the expiry of a single item in the upload queue
|
||||
func (c *Cache) QueueSetExpiry(id writeback.Handle, expiry time.Time) error {
|
||||
return c.writeback.SetExpiry(id, expiry)
|
||||
}
|
||||
|
||||
// createDir creates a directory path, along with any necessary parents
|
||||
func createDir(dir string) error {
|
||||
return file.MkdirAll(dir, 0700)
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/config"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/lib/diskusage"
|
||||
"github.com/rclone/rclone/vfs/vfscache/writeback"
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -727,3 +728,26 @@ func TestCacheStats(t *testing.T) {
|
|||
assert.Equal(t, 0, out["uploadsInProgress"])
|
||||
assert.Equal(t, 0, out["uploadsQueued"])
|
||||
}
|
||||
|
||||
func TestCacheQueue(t *testing.T) {
|
||||
_, c := newTestCache(t)
|
||||
|
||||
out := c.Queue()
|
||||
|
||||
// We've checked the contents of queue in the writeback tests
|
||||
// Just check it is present here
|
||||
queue, found := out["queue"]
|
||||
require.True(t, found)
|
||||
_, ok := queue.([]writeback.QueueInfo)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestCacheQueueSetExpiry(t *testing.T) {
|
||||
_, c := newTestCache(t)
|
||||
|
||||
// Check this returns the correct error when called so we know
|
||||
// it is plumbed in correctly. The actual tests are done in
|
||||
// writeback.
|
||||
err := c.QueueSetExpiry(123123, time.Now())
|
||||
assert.Equal(t, writeback.ErrorIDNotFound, err)
|
||||
}
|
||||
|
|
|
@ -728,7 +728,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
|||
item.c.writeback.SetID(&item.writeBackID)
|
||||
id := item.writeBackID
|
||||
item.mu.Unlock()
|
||||
item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error {
|
||||
item.c.writeback.Add(id, item.name, item.info.Size, item.modified, func(ctx context.Context) error {
|
||||
return item.store(ctx, storeFn)
|
||||
})
|
||||
item.mu.Lock()
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"container/heap"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -62,6 +63,7 @@ func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
|
|||
// writeBack.mu must be held to manipulate this
|
||||
type writeBackItem struct {
|
||||
name string // name of the item so we don't have to read it from item
|
||||
size int64 // size of the item so we don't have to read it from item
|
||||
id Handle // id of the item
|
||||
index int // index into the priority queue for update
|
||||
expiry time.Time // When this expires we will write it back
|
||||
|
@ -135,10 +137,11 @@ func (wb *WriteBack) _newExpiry() time.Time {
|
|||
// make a new writeBackItem
|
||||
//
|
||||
// call with the lock held
|
||||
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
|
||||
func (wb *WriteBack) _newItem(id Handle, name string, size int64) *writeBackItem {
|
||||
wb.SetID(&id)
|
||||
wbItem := &writeBackItem{
|
||||
name: name,
|
||||
size: size,
|
||||
expiry: wb._newExpiry(),
|
||||
delay: wb.opt.WriteBack,
|
||||
id: id,
|
||||
|
@ -256,13 +259,13 @@ func (wb *WriteBack) SetID(pid *Handle) {
|
|||
//
|
||||
// If modified is false then it it doesn't cancel a pending upload if
|
||||
// there is one as there is no need.
|
||||
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
|
||||
func (wb *WriteBack) Add(id Handle, name string, size int64, modified bool, putFn PutFn) Handle {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
||||
wbItem, ok := wb.lookup[id]
|
||||
if !ok {
|
||||
wbItem = wb._newItem(id, name)
|
||||
wbItem = wb._newItem(id, name, size)
|
||||
} else {
|
||||
if wbItem.uploading && modified {
|
||||
// We are uploading already so cancel the upload
|
||||
|
@ -272,6 +275,7 @@ func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Han
|
|||
wb.items._update(wbItem, wb._newExpiry())
|
||||
}
|
||||
wbItem.putFn = putFn
|
||||
wbItem.size = size
|
||||
wb._resetTimer()
|
||||
return wbItem.id
|
||||
}
|
||||
|
@ -463,3 +467,70 @@ func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
|
|||
defer wb.mu.Unlock()
|
||||
return wb.uploads, len(wb.items)
|
||||
}
|
||||
|
||||
// QueueInfo is information about an item queued for upload, returned
|
||||
// by Queue
|
||||
type QueueInfo struct {
|
||||
Name string `json:"name"` // name (full path) of the file,
|
||||
ID Handle `json:"id"` // id of queue item
|
||||
Size int64 `json:"size"` // integer size of the file in bytes
|
||||
Expiry float64 `json:"expiry"` // seconds from now which the file is eligible for transfer, oldest goes first
|
||||
Tries int `json:"tries"` // number of times we have tried to upload
|
||||
Delay float64 `json:"delay"` // delay between upload attempts (s)
|
||||
Uploading bool `json:"uploading"` // true if item is being uploaded
|
||||
}
|
||||
|
||||
// Queue return info about the current upload queue
|
||||
func (wb *WriteBack) Queue() []QueueInfo {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
||||
items := make([]QueueInfo, 0, len(wb.lookup))
|
||||
now := time.Now()
|
||||
|
||||
// Lookup all the items in no particular order
|
||||
for _, wbItem := range wb.lookup {
|
||||
items = append(items, QueueInfo{
|
||||
Name: wbItem.name,
|
||||
ID: wbItem.id,
|
||||
Size: wbItem.size,
|
||||
Expiry: wbItem.expiry.Sub(now).Seconds(),
|
||||
Tries: wbItem.tries,
|
||||
Delay: wbItem.delay.Seconds(),
|
||||
Uploading: wbItem.uploading,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort by Uploading first then Expiry
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
if items[i].Uploading != items[j].Uploading {
|
||||
return items[i].Uploading
|
||||
}
|
||||
return items[i].Expiry < items[j].Expiry
|
||||
})
|
||||
|
||||
return items
|
||||
}
|
||||
|
||||
// ErrorIDNotFound is returned from SetExpiry when the item is not found
|
||||
var ErrorIDNotFound = errors.New("id not found in queue")
|
||||
|
||||
// SetExpiry sets the expiry time for an item in the writeback queue.
|
||||
//
|
||||
// id should be as returned from the Queue call
|
||||
//
|
||||
// If the item isn't found then it will return ErrorIDNotFound
|
||||
func (wb *WriteBack) SetExpiry(id Handle, expiry time.Time) error {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
||||
wbItem, ok := wb.lookup[id]
|
||||
if !ok {
|
||||
return ErrorIDNotFound
|
||||
}
|
||||
|
||||
// Update the expiry with the user requested value
|
||||
wb.items._update(wbItem, expiry)
|
||||
wb._resetTimer()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
|
||||
|
@ -122,15 +123,15 @@ func TestWriteBackItemCRUD(t *testing.T) {
|
|||
// _peekItem empty
|
||||
assert.Nil(t, wb._peekItem())
|
||||
|
||||
wbItem1 := wb._newItem(0, "one")
|
||||
wbItem1 := wb._newItem(0, "one", 10)
|
||||
checkOnHeap(t, wb, wbItem1)
|
||||
checkInLookup(t, wb, wbItem1)
|
||||
|
||||
wbItem2 := wb._newItem(0, "two")
|
||||
wbItem2 := wb._newItem(0, "two", 10)
|
||||
checkOnHeap(t, wb, wbItem2)
|
||||
checkInLookup(t, wb, wbItem2)
|
||||
|
||||
wbItem3 := wb._newItem(0, "three")
|
||||
wbItem3 := wb._newItem(0, "three", 10)
|
||||
checkOnHeap(t, wb, wbItem3)
|
||||
checkInLookup(t, wb, wbItem3)
|
||||
|
||||
|
@ -201,7 +202,7 @@ func TestWriteBackResetTimer(t *testing.T) {
|
|||
// Check timer is stopped
|
||||
assertTimerRunning(t, wb, false)
|
||||
|
||||
_ = wb._newItem(0, "three")
|
||||
_ = wb._newItem(0, "three", 10)
|
||||
|
||||
// Reset the timer on an queue with stuff
|
||||
wb._resetTimer()
|
||||
|
@ -297,7 +298,7 @@ func TestWriteBackAddOK(t *testing.T) {
|
|||
wb.SetID(&inID)
|
||||
assert.Equal(t, Handle(1), inID)
|
||||
|
||||
id := wb.Add(inID, "one", true, pi.put)
|
||||
id := wb.Add(inID, "one", 10, true, pi.put)
|
||||
assert.Equal(t, inID, id)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
|
@ -321,7 +322,7 @@ func TestWriteBackAddFailRetry(t *testing.T) {
|
|||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", true, pi.put)
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -354,8 +355,9 @@ func TestWriteBackAddUpdate(t *testing.T) {
|
|||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", true, pi.put)
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
assert.Equal(t, int64(10), wbItem.size) // check size
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
assert.Equal(t, "one", wb.string(t))
|
||||
|
@ -367,9 +369,10 @@ func TestWriteBackAddUpdate(t *testing.T) {
|
|||
// Now the upload has started add another one
|
||||
|
||||
pi2 := newPutItem(t)
|
||||
id2 := wb.Add(id, "one", true, pi2.put)
|
||||
id2 := wb.Add(id, "one", 20, true, pi2.put)
|
||||
assert.Equal(t, id, id2)
|
||||
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
|
||||
assert.Equal(t, int64(20), wbItem.size) // check size has changed
|
||||
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
||||
// check the previous transfer was cancelled
|
||||
|
@ -393,7 +396,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
|
|||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", false, pi.put)
|
||||
id := wb.Add(0, "one", 10, false, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -406,7 +409,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
|
|||
// Now the upload has started add another one
|
||||
|
||||
pi2 := newPutItem(t)
|
||||
id2 := wb.Add(id, "one", false, pi2.put)
|
||||
id2 := wb.Add(id, "one", 10, false, pi2.put)
|
||||
assert.Equal(t, id, id2)
|
||||
checkNotOnHeap(t, wb, wbItem) // object still being transferred
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -432,7 +435,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
|
|||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", true, pi.put)
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -441,7 +444,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
|
|||
// Immediately add another upload before the first has started
|
||||
|
||||
pi2 := newPutItem(t)
|
||||
id2 := wb.Add(id, "one", true, pi2.put)
|
||||
id2 := wb.Add(id, "one", 10, true, pi2.put)
|
||||
assert.Equal(t, id, id2)
|
||||
checkOnHeap(t, wb, wbItem) // object still awaiting transfer
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -470,7 +473,7 @@ func TestWriteBackGetStats(t *testing.T) {
|
|||
|
||||
pi := newPutItem(t)
|
||||
|
||||
wb.Add(0, "one", true, pi.put)
|
||||
wb.Add(0, "one", 10, true, pi.put)
|
||||
|
||||
inProgress, queued := wb.Stats()
|
||||
assert.Equal(t, queued, 1)
|
||||
|
@ -491,6 +494,101 @@ func TestWriteBackGetStats(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestWriteBackQueue(t *testing.T) {
|
||||
wb, cancel := newTestWriteBack(t)
|
||||
defer cancel()
|
||||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
|
||||
queue := wb.Queue()
|
||||
require.Equal(t, 1, len(queue))
|
||||
assert.Greater(t, queue[0].Expiry, 0.0)
|
||||
assert.Less(t, queue[0].Expiry, 1.0)
|
||||
queue[0].Expiry = 0.0
|
||||
assert.Equal(t, []QueueInfo{
|
||||
{
|
||||
Name: "one",
|
||||
Size: 10,
|
||||
Expiry: 0.0,
|
||||
Tries: 0,
|
||||
Delay: 0.1,
|
||||
Uploading: false,
|
||||
ID: id,
|
||||
},
|
||||
}, queue)
|
||||
|
||||
<-pi.started
|
||||
|
||||
queue = wb.Queue()
|
||||
require.Equal(t, 1, len(queue))
|
||||
assert.Less(t, queue[0].Expiry, 0.0)
|
||||
assert.Greater(t, queue[0].Expiry, -1.0)
|
||||
queue[0].Expiry = 0.0
|
||||
assert.Equal(t, []QueueInfo{
|
||||
{
|
||||
Name: "one",
|
||||
Size: 10,
|
||||
Expiry: 0.0,
|
||||
Tries: 1,
|
||||
Delay: 0.1,
|
||||
Uploading: true,
|
||||
ID: id,
|
||||
},
|
||||
}, queue)
|
||||
|
||||
pi.finish(nil) // transfer successful
|
||||
waitUntilNoTransfers(t, wb)
|
||||
|
||||
queue = wb.Queue()
|
||||
assert.Equal(t, []QueueInfo{}, queue)
|
||||
}
|
||||
|
||||
func TestWriteBackSetExpiry(t *testing.T) {
|
||||
wb, cancel := newTestWriteBack(t)
|
||||
defer cancel()
|
||||
|
||||
err := wb.SetExpiry(123123123, time.Now())
|
||||
assert.Equal(t, ErrorIDNotFound, err)
|
||||
|
||||
pi := newPutItem(t)
|
||||
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
|
||||
// get the expiry time with locking so we don't cause races
|
||||
getExpiry := func() time.Time {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
return wbItem.expiry
|
||||
}
|
||||
|
||||
expiry := time.Until(getExpiry()).Seconds()
|
||||
assert.Greater(t, expiry, 0.0)
|
||||
assert.Less(t, expiry, 1.0)
|
||||
|
||||
newExpiry := time.Now().Add(100 * time.Second)
|
||||
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
|
||||
assert.Equal(t, newExpiry, getExpiry())
|
||||
|
||||
// This starts the transfer
|
||||
newExpiry = time.Now().Add(-100 * time.Second)
|
||||
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
|
||||
assert.Equal(t, newExpiry, getExpiry())
|
||||
|
||||
<-pi.started
|
||||
|
||||
expiry = time.Until(getExpiry()).Seconds()
|
||||
assert.LessOrEqual(t, expiry, -100.0)
|
||||
|
||||
pi.finish(nil) // transfer successful
|
||||
waitUntilNoTransfers(t, wb)
|
||||
|
||||
expiry = time.Until(getExpiry()).Seconds()
|
||||
assert.LessOrEqual(t, expiry, -100.0)
|
||||
}
|
||||
|
||||
// Test queuing more than fs.Config.Transfers
|
||||
func TestWriteBackMaxQueue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
@ -506,7 +604,7 @@ func TestWriteBackMaxQueue(t *testing.T) {
|
|||
for i := 0; i < toTransfer; i++ {
|
||||
pi := newPutItem(t)
|
||||
pis = append(pis, pi)
|
||||
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
|
||||
wb.Add(0, fmt.Sprintf("number%d", 1), 10, true, pi.put)
|
||||
}
|
||||
|
||||
inProgress, queued := wb.Stats()
|
||||
|
@ -551,7 +649,7 @@ func TestWriteBackRename(t *testing.T) {
|
|||
|
||||
// add item
|
||||
pi1 := newPutItem(t)
|
||||
id := wb.Add(0, "one", true, pi1.put)
|
||||
id := wb.Add(0, "one", 10, true, pi1.put)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -566,7 +664,7 @@ func TestWriteBackRename(t *testing.T) {
|
|||
|
||||
// add item
|
||||
pi2 := newPutItem(t)
|
||||
id = wb.Add(id, "two", true, pi2.put)
|
||||
id = wb.Add(id, "two", 10, true, pi2.put)
|
||||
wbItem = wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
@ -591,9 +689,9 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
|
|||
wb, cancel := newTestWriteBack(t)
|
||||
defer cancel()
|
||||
|
||||
// add item "one"
|
||||
// add item "one", 10
|
||||
pi1 := newPutItem(t)
|
||||
id1 := wb.Add(0, "one", true, pi1.put)
|
||||
id1 := wb.Add(0, "one", 10, true, pi1.put)
|
||||
wbItem1 := wb.lookup[id1]
|
||||
checkOnHeap(t, wb, wbItem1)
|
||||
checkInLookup(t, wb, wbItem1)
|
||||
|
@ -605,7 +703,7 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
|
|||
|
||||
// add item "two"
|
||||
pi2 := newPutItem(t)
|
||||
id2 := wb.Add(0, "two", true, pi2.put)
|
||||
id2 := wb.Add(0, "two", 10, true, pi2.put)
|
||||
wbItem2 := wb.lookup[id2]
|
||||
checkOnHeap(t, wb, wbItem2)
|
||||
checkInLookup(t, wb, wbItem2)
|
||||
|
@ -641,7 +739,7 @@ func TestWriteBackCancelUpload(t *testing.T) {
|
|||
|
||||
// add item
|
||||
pi := newPutItem(t)
|
||||
id := wb.Add(0, "one", true, pi.put)
|
||||
id := wb.Add(0, "one", 10, true, pi.put)
|
||||
wbItem := wb.lookup[id]
|
||||
checkOnHeap(t, wb, wbItem)
|
||||
checkInLookup(t, wb, wbItem)
|
||||
|
|
Loading…
Add table
Reference in a new issue