Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Nick Craig-Wood
b21ec3db7d rc: add vfs/queue-set-expiry to adjust expiry of items in the VFS queue 2024-06-21 16:08:33 +01:00
Nick Craig-Wood
8df2c9438f rc: add vfs/queue to show the status of the upload queue 2024-06-20 15:34:26 +01:00
Nick Craig-Wood
5c54531d04 vfs: keep a record of the file size in the writeback queue 2024-06-20 15:33:43 +01:00
6 changed files with 339 additions and 25 deletions

109
vfs/rc.go
View file

@ -11,6 +11,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/vfs/vfscache/writeback"
)
const getVFSHelp = `
@ -437,3 +438,111 @@ func rcStats(ctx context.Context, in rc.Params) (out rc.Params, err error) {
}
return vfs.Stats(), nil
}
func init() {
rc.Add(rc.Call{
Path: "vfs/queue",
Title: "Queue info for a VFS.",
Help: strings.ReplaceAll(`
This returns info about the upload queue for the selected VFS.
This is only useful if |--vfs-cache-mode| > off. If you call it when
the |--vfs-cache-mode| is off, it will return an empty result.
{
"queued": // an array of files queued for upload
[
{
"name": "file", // string: name (full path) of the file,
"id": 123, // integer: id of this item in the queue,
"size": 79, // integer: size of the file in bytes
"expiry": 1.5 // float: time until file is eligible for transfer, lowest goes first
"tries": 1, // integer: number of times we have tried to upload
"delay": 5.0, // float: seconds between upload attempts
"uploading": false, // boolean: true if item is being uploaded
},
],
}
The |expiry| time is the time until the file is elegible for being
uploaded in floating point seconds. This may go negative. As rclone
only transfers |--transfers| files at once, only the lowest
|--transfers| expiry times will have |uploading| as |true|. So there
may be files with negative expiry times for which |uploading| is
|false|.
`, "|", "`") + getVFSHelp,
Fn: rcQueue,
})
}
func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) {
vfs, err := getVFS(in)
if err != nil {
return nil, err
}
if vfs.cache == nil {
return nil, nil
}
return vfs.cache.Queue(), nil
}
func init() {
rc.Add(rc.Call{
Path: "vfs/queue-set-expiry",
Title: "Set the expiry time for an item queued for upload.",
Help: strings.ReplaceAll(`
Use this to adjust the |expiry| time for an item in the upload queue.
You will need to read the |id| of the item using |vfs/queue| before
using this call.
You can then set |expiry| to a floating point number of seconds from
now when the item is eligible for upload. If you want the item to be
uploaded as soon as possible then set it to a large negative number (eg
-1000000000). If you want the upload of the item to be delayed
for a long time then set it to a large positive number.
Setting the |expiry| of an item which has already has started uploading
will have no effect - the item will carry on being uploaded.
This will return an error if called with |--vfs-cache-mode| off or if
the |id| passed is not found.
This takes the following parameters
- |fs| - select the VFS in use (optional)
- |id| - a numeric ID as returned from |vfs/queue|
- |expiry| - a new expiry time as floating point seconds
This returns an empty result on success, or an error.
`, "|", "`") + getVFSHelp,
Fn: rcQueueSetExpiry,
})
}
func rcQueueSetExpiry(ctx context.Context, in rc.Params) (out rc.Params, err error) {
vfs, err := getVFS(in)
if err != nil {
return nil, err
}
if vfs.cache == nil {
return nil, rc.NewErrParamInvalid(errors.New("can't call this unless using the VFS cache"))
}
// Read input values
id, err := in.GetInt64("id")
if err != nil {
return nil, err
}
expiry, err := in.GetFloat64("expiry")
if err != nil {
return nil, err
}
// Set expiry
expiryTime := time.Now().Add(time.Duration(float64(time.Second) * expiry))
err = vfs.cache.QueueSetExpiry(writeback.Handle(id), expiryTime)
return nil, err
}

View file

@ -170,6 +170,18 @@ func (c *Cache) Stats() (out rc.Params) {
return out
}
// Queue returns info about the Cache
func (c *Cache) Queue() (out rc.Params) {
out = make(rc.Params)
out["queue"] = c.writeback.Queue()
return out
}
// QueueSetExpiry updates the expiry of a single item in the upload queue
func (c *Cache) QueueSetExpiry(id writeback.Handle, expiry time.Time) error {
return c.writeback.SetExpiry(id, expiry)
}
// createDir creates a directory path, along with any necessary parents
func createDir(dir string) error {
return file.MkdirAll(dir, 0700)

View file

@ -14,6 +14,7 @@ import (
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/lib/diskusage"
"github.com/rclone/rclone/vfs/vfscache/writeback"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -727,3 +728,26 @@ func TestCacheStats(t *testing.T) {
assert.Equal(t, 0, out["uploadsInProgress"])
assert.Equal(t, 0, out["uploadsQueued"])
}
func TestCacheQueue(t *testing.T) {
_, c := newTestCache(t)
out := c.Queue()
// We've checked the contents of queue in the writeback tests
// Just check it is present here
queue, found := out["queue"]
require.True(t, found)
_, ok := queue.([]writeback.QueueInfo)
require.True(t, ok)
}
func TestCacheQueueSetExpiry(t *testing.T) {
_, c := newTestCache(t)
// Check this returns the correct error when called so we know
// it is plumbed in correctly. The actual tests are done in
// writeback.
err := c.QueueSetExpiry(123123, time.Now())
assert.Equal(t, writeback.ErrorIDNotFound, err)
}

View file

@ -728,7 +728,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
item.c.writeback.SetID(&item.writeBackID)
id := item.writeBackID
item.mu.Unlock()
item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error {
item.c.writeback.Add(id, item.name, item.info.Size, item.modified, func(ctx context.Context) error {
return item.store(ctx, storeFn)
})
item.mu.Lock()

View file

@ -6,6 +6,7 @@ import (
"container/heap"
"context"
"errors"
"sort"
"sync"
"sync/atomic"
"time"
@ -62,6 +63,7 @@ func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
// writeBack.mu must be held to manipulate this
type writeBackItem struct {
name string // name of the item so we don't have to read it from item
size int64 // size of the item so we don't have to read it from item
id Handle // id of the item
index int // index into the priority queue for update
expiry time.Time // When this expires we will write it back
@ -135,10 +137,11 @@ func (wb *WriteBack) _newExpiry() time.Time {
// make a new writeBackItem
//
// call with the lock held
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
func (wb *WriteBack) _newItem(id Handle, name string, size int64) *writeBackItem {
wb.SetID(&id)
wbItem := &writeBackItem{
name: name,
size: size,
expiry: wb._newExpiry(),
delay: wb.opt.WriteBack,
id: id,
@ -256,13 +259,13 @@ func (wb *WriteBack) SetID(pid *Handle) {
//
// If modified is false then it it doesn't cancel a pending upload if
// there is one as there is no need.
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
func (wb *WriteBack) Add(id Handle, name string, size int64, modified bool, putFn PutFn) Handle {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok {
wbItem = wb._newItem(id, name)
wbItem = wb._newItem(id, name, size)
} else {
if wbItem.uploading && modified {
// We are uploading already so cancel the upload
@ -272,6 +275,7 @@ func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Han
wb.items._update(wbItem, wb._newExpiry())
}
wbItem.putFn = putFn
wbItem.size = size
wb._resetTimer()
return wbItem.id
}
@ -463,3 +467,70 @@ func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
defer wb.mu.Unlock()
return wb.uploads, len(wb.items)
}
// QueueInfo is information about an item queued for upload, returned
// by Queue
type QueueInfo struct {
Name string `json:"name"` // name (full path) of the file,
ID Handle `json:"id"` // id of queue item
Size int64 `json:"size"` // integer size of the file in bytes
Expiry float64 `json:"expiry"` // seconds from now which the file is eligible for transfer, oldest goes first
Tries int `json:"tries"` // number of times we have tried to upload
Delay float64 `json:"delay"` // delay between upload attempts (s)
Uploading bool `json:"uploading"` // true if item is being uploaded
}
// Queue return info about the current upload queue
func (wb *WriteBack) Queue() []QueueInfo {
wb.mu.Lock()
defer wb.mu.Unlock()
items := make([]QueueInfo, 0, len(wb.lookup))
now := time.Now()
// Lookup all the items in no particular order
for _, wbItem := range wb.lookup {
items = append(items, QueueInfo{
Name: wbItem.name,
ID: wbItem.id,
Size: wbItem.size,
Expiry: wbItem.expiry.Sub(now).Seconds(),
Tries: wbItem.tries,
Delay: wbItem.delay.Seconds(),
Uploading: wbItem.uploading,
})
}
// Sort by Uploading first then Expiry
sort.Slice(items, func(i, j int) bool {
if items[i].Uploading != items[j].Uploading {
return items[i].Uploading
}
return items[i].Expiry < items[j].Expiry
})
return items
}
// ErrorIDNotFound is returned from SetExpiry when the item is not found
var ErrorIDNotFound = errors.New("id not found in queue")
// SetExpiry sets the expiry time for an item in the writeback queue.
//
// id should be as returned from the Queue call
//
// If the item isn't found then it will return ErrorIDNotFound
func (wb *WriteBack) SetExpiry(id Handle, expiry time.Time) error {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok {
return ErrorIDNotFound
}
// Update the expiry with the user requested value
wb.items._update(wbItem, expiry)
wb._resetTimer()
return nil
}

View file

@ -13,6 +13,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
@ -122,15 +123,15 @@ func TestWriteBackItemCRUD(t *testing.T) {
// _peekItem empty
assert.Nil(t, wb._peekItem())
wbItem1 := wb._newItem(0, "one")
wbItem1 := wb._newItem(0, "one", 10)
checkOnHeap(t, wb, wbItem1)
checkInLookup(t, wb, wbItem1)
wbItem2 := wb._newItem(0, "two")
wbItem2 := wb._newItem(0, "two", 10)
checkOnHeap(t, wb, wbItem2)
checkInLookup(t, wb, wbItem2)
wbItem3 := wb._newItem(0, "three")
wbItem3 := wb._newItem(0, "three", 10)
checkOnHeap(t, wb, wbItem3)
checkInLookup(t, wb, wbItem3)
@ -201,7 +202,7 @@ func TestWriteBackResetTimer(t *testing.T) {
// Check timer is stopped
assertTimerRunning(t, wb, false)
_ = wb._newItem(0, "three")
_ = wb._newItem(0, "three", 10)
// Reset the timer on an queue with stuff
wb._resetTimer()
@ -297,7 +298,7 @@ func TestWriteBackAddOK(t *testing.T) {
wb.SetID(&inID)
assert.Equal(t, Handle(1), inID)
id := wb.Add(inID, "one", true, pi.put)
id := wb.Add(inID, "one", 10, true, pi.put)
assert.Equal(t, inID, id)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
@ -321,7 +322,7 @@ func TestWriteBackAddFailRetry(t *testing.T) {
pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -354,8 +355,9 @@ func TestWriteBackAddUpdate(t *testing.T) {
pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
assert.Equal(t, int64(10), wbItem.size) // check size
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -367,9 +369,10 @@ func TestWriteBackAddUpdate(t *testing.T) {
// Now the upload has started add another one
pi2 := newPutItem(t)
id2 := wb.Add(id, "one", true, pi2.put)
id2 := wb.Add(id, "one", 20, true, pi2.put)
assert.Equal(t, id, id2)
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
assert.Equal(t, int64(20), wbItem.size) // check size has changed
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
checkInLookup(t, wb, wbItem)
// check the previous transfer was cancelled
@ -393,7 +396,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
pi := newPutItem(t)
id := wb.Add(0, "one", false, pi.put)
id := wb.Add(0, "one", 10, false, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -406,7 +409,7 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
// Now the upload has started add another one
pi2 := newPutItem(t)
id2 := wb.Add(id, "one", false, pi2.put)
id2 := wb.Add(id, "one", 10, false, pi2.put)
assert.Equal(t, id, id2)
checkNotOnHeap(t, wb, wbItem) // object still being transferred
checkInLookup(t, wb, wbItem)
@ -432,7 +435,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -441,7 +444,7 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
// Immediately add another upload before the first has started
pi2 := newPutItem(t)
id2 := wb.Add(id, "one", true, pi2.put)
id2 := wb.Add(id, "one", 10, true, pi2.put)
assert.Equal(t, id, id2)
checkOnHeap(t, wb, wbItem) // object still awaiting transfer
checkInLookup(t, wb, wbItem)
@ -470,7 +473,7 @@ func TestWriteBackGetStats(t *testing.T) {
pi := newPutItem(t)
wb.Add(0, "one", true, pi.put)
wb.Add(0, "one", 10, true, pi.put)
inProgress, queued := wb.Stats()
assert.Equal(t, queued, 1)
@ -491,6 +494,101 @@ func TestWriteBackGetStats(t *testing.T) {
}
func TestWriteBackQueue(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
pi := newPutItem(t)
id := wb.Add(0, "one", 10, true, pi.put)
queue := wb.Queue()
require.Equal(t, 1, len(queue))
assert.Greater(t, queue[0].Expiry, 0.0)
assert.Less(t, queue[0].Expiry, 1.0)
queue[0].Expiry = 0.0
assert.Equal(t, []QueueInfo{
{
Name: "one",
Size: 10,
Expiry: 0.0,
Tries: 0,
Delay: 0.1,
Uploading: false,
ID: id,
},
}, queue)
<-pi.started
queue = wb.Queue()
require.Equal(t, 1, len(queue))
assert.Less(t, queue[0].Expiry, 0.0)
assert.Greater(t, queue[0].Expiry, -1.0)
queue[0].Expiry = 0.0
assert.Equal(t, []QueueInfo{
{
Name: "one",
Size: 10,
Expiry: 0.0,
Tries: 1,
Delay: 0.1,
Uploading: true,
ID: id,
},
}, queue)
pi.finish(nil) // transfer successful
waitUntilNoTransfers(t, wb)
queue = wb.Queue()
assert.Equal(t, []QueueInfo{}, queue)
}
func TestWriteBackSetExpiry(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
err := wb.SetExpiry(123123123, time.Now())
assert.Equal(t, ErrorIDNotFound, err)
pi := newPutItem(t)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
// get the expiry time with locking so we don't cause races
getExpiry := func() time.Time {
wb.mu.Lock()
defer wb.mu.Unlock()
return wbItem.expiry
}
expiry := time.Until(getExpiry()).Seconds()
assert.Greater(t, expiry, 0.0)
assert.Less(t, expiry, 1.0)
newExpiry := time.Now().Add(100 * time.Second)
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
assert.Equal(t, newExpiry, getExpiry())
// This starts the transfer
newExpiry = time.Now().Add(-100 * time.Second)
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
assert.Equal(t, newExpiry, getExpiry())
<-pi.started
expiry = time.Until(getExpiry()).Seconds()
assert.LessOrEqual(t, expiry, -100.0)
pi.finish(nil) // transfer successful
waitUntilNoTransfers(t, wb)
expiry = time.Until(getExpiry()).Seconds()
assert.LessOrEqual(t, expiry, -100.0)
}
// Test queuing more than fs.Config.Transfers
func TestWriteBackMaxQueue(t *testing.T) {
ctx := context.Background()
@ -506,7 +604,7 @@ func TestWriteBackMaxQueue(t *testing.T) {
for i := 0; i < toTransfer; i++ {
pi := newPutItem(t)
pis = append(pis, pi)
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
wb.Add(0, fmt.Sprintf("number%d", 1), 10, true, pi.put)
}
inProgress, queued := wb.Stats()
@ -551,7 +649,7 @@ func TestWriteBackRename(t *testing.T) {
// add item
pi1 := newPutItem(t)
id := wb.Add(0, "one", true, pi1.put)
id := wb.Add(0, "one", 10, true, pi1.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -566,7 +664,7 @@ func TestWriteBackRename(t *testing.T) {
// add item
pi2 := newPutItem(t)
id = wb.Add(id, "two", true, pi2.put)
id = wb.Add(id, "two", 10, true, pi2.put)
wbItem = wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -591,9 +689,9 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
// add item "one"
// add item "one", 10
pi1 := newPutItem(t)
id1 := wb.Add(0, "one", true, pi1.put)
id1 := wb.Add(0, "one", 10, true, pi1.put)
wbItem1 := wb.lookup[id1]
checkOnHeap(t, wb, wbItem1)
checkInLookup(t, wb, wbItem1)
@ -605,7 +703,7 @@ func TestWriteBackRenameDuplicates(t *testing.T) {
// add item "two"
pi2 := newPutItem(t)
id2 := wb.Add(0, "two", true, pi2.put)
id2 := wb.Add(0, "two", 10, true, pi2.put)
wbItem2 := wb.lookup[id2]
checkOnHeap(t, wb, wbItem2)
checkInLookup(t, wb, wbItem2)
@ -641,7 +739,7 @@ func TestWriteBackCancelUpload(t *testing.T) {
// add item
pi := newPutItem(t)
id := wb.Add(0, "one", true, pi.put)
id := wb.Add(0, "one", 10, true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)