rclone/vfs/vfscache/writeback/writeback.go
Nick Craig-Wood e43b5ce5e5 Remove github.com/pkg/errors and replace with std library version
This is possible now that we no longer support go1.12 and brings
rclone into line with standard practices in the Go world.

This also removes errors.New and errors.Errorf from lib/errors and
prefers the stdlib errors package over lib/errors.
2021-11-07 11:53:30 +00:00

465 lines
12 KiB
Go

// Package writeback keeps track of the files which need to be written
// back to storage
package writeback
import (
"container/heap"
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/vfs/vfscommon"
)
const (
maxUploadDelay = 5 * time.Minute // max delay between upload attempts
)
// PutFn is the interface that item provides to store the data
type PutFn func(context.Context) error
// Handle is returned for callers to keep track of writeback items
type Handle uint64
// WriteBack keeps track of the items which need to be written back to the disk at some point
type WriteBack struct {
ctx context.Context
mu sync.Mutex
items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
lookup map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled
opt *vfscommon.Options // VFS options
timer *time.Timer // next scheduled time for the uploader
expiry time.Time // time the next item expires or IsZero
uploads int // number of uploads in progress
// read and written with atomic
id Handle // id of the last writeBackItem created
}
// New make a new WriteBack
//
// cancel the context to stop the background processing
func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
wb := &WriteBack{
ctx: ctx,
items: writeBackItems{},
lookup: make(map[Handle]*writeBackItem),
opt: opt,
}
heap.Init(&wb.items)
return wb
}
// writeBackItem stores an Item awaiting writeback
//
// These are stored on the items heap when awaiting transfer but
// removed from the items heap when transferring. They remain in the
// lookup map until cancelled.
//
// writeBack.mu must be held to manipulate this
type writeBackItem struct {
name string // name of the item so we don't have to read it from item
id Handle // id of the item
index int // index into the priority queue for update
expiry time.Time // When this expires we will write it back
uploading bool // True if item is being processed by upload() method
onHeap bool // true if this item is on the items heap
cancel context.CancelFunc // To cancel the upload with
done chan struct{} // closed when the cancellation completes
putFn PutFn // To write the object data
tries int // number of times we have tried to upload
delay time.Duration // delay between upload attempts
}
// A writeBackItems implements a priority queue by implementing
// heap.Interface and holds writeBackItems.
type writeBackItems []*writeBackItem
func (ws writeBackItems) Len() int { return len(ws) }
func (ws writeBackItems) Less(i, j int) bool {
a, b := ws[i], ws[j]
// If times are equal then use ID to disambiguate
if a.expiry.Equal(b.expiry) {
return a.id < b.id
}
return a.expiry.Before(b.expiry)
}
func (ws writeBackItems) Swap(i, j int) {
ws[i], ws[j] = ws[j], ws[i]
ws[i].index = i
ws[j].index = j
}
func (ws *writeBackItems) Push(x interface{}) {
n := len(*ws)
item := x.(*writeBackItem)
item.index = n
*ws = append(*ws, item)
}
func (ws *writeBackItems) Pop() interface{} {
old := *ws
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*ws = old[0 : n-1]
return item
}
// update modifies the expiry of an Item in the queue.
//
// call with lock held
func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) {
item.expiry = expiry
heap.Fix(ws, item.index)
}
// return a new expiry time based from now until the WriteBack timeout
//
// call with lock held
func (wb *WriteBack) _newExpiry() time.Time {
expiry := time.Now()
if wb.opt.WriteBack > 0 {
expiry = expiry.Add(wb.opt.WriteBack)
}
// expiry = expiry.Round(time.Millisecond)
return expiry
}
// make a new writeBackItem
//
// call with the lock held
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
wb.SetID(&id)
wbItem := &writeBackItem{
name: name,
expiry: wb._newExpiry(),
delay: wb.opt.WriteBack,
id: id,
}
wb._addItem(wbItem)
wb._pushItem(wbItem)
return wbItem
}
// add a writeBackItem to the lookup map
//
// call with the lock held
func (wb *WriteBack) _addItem(wbItem *writeBackItem) {
wb.lookup[wbItem.id] = wbItem
}
// delete a writeBackItem from the lookup map
//
// call with the lock held
func (wb *WriteBack) _delItem(wbItem *writeBackItem) {
delete(wb.lookup, wbItem.id)
}
// pop a writeBackItem from the items heap
//
// call with the lock held
func (wb *WriteBack) _popItem() (wbItem *writeBackItem) {
wbItem = heap.Pop(&wb.items).(*writeBackItem)
wbItem.onHeap = false
return wbItem
}
// push a writeBackItem onto the items heap
//
// call with the lock held
func (wb *WriteBack) _pushItem(wbItem *writeBackItem) {
if !wbItem.onHeap {
heap.Push(&wb.items, wbItem)
wbItem.onHeap = true
}
}
// remove a writeBackItem from the items heap
//
// call with the lock held
func (wb *WriteBack) _removeItem(wbItem *writeBackItem) {
if wbItem.onHeap {
heap.Remove(&wb.items, wbItem.index)
wbItem.onHeap = false
}
}
// peek the oldest writeBackItem - may be nil
//
// call with the lock held
func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) {
if len(wb.items) == 0 {
return nil
}
return wb.items[0]
}
// stop the timer which runs the expiries
func (wb *WriteBack) _stopTimer() {
if wb.expiry.IsZero() {
return
}
wb.expiry = time.Time{}
// fs.Debugf(nil, "resetTimer STOP")
if wb.timer != nil {
wb.timer.Stop()
wb.timer = nil
}
}
// reset the timer which runs the expiries
func (wb *WriteBack) _resetTimer() {
wbItem := wb._peekItem()
if wbItem == nil {
wb._stopTimer()
} else {
if wb.expiry.Equal(wbItem.expiry) {
return
}
wb.expiry = wbItem.expiry
dt := time.Until(wbItem.expiry)
if dt < 0 {
dt = 0
}
// fs.Debugf(nil, "resetTimer dt=%v", dt)
if wb.timer != nil {
wb.timer.Stop()
}
wb.timer = time.AfterFunc(dt, func() {
wb.processItems(wb.ctx)
})
}
}
// SetID sets the Handle pointed to if it is non zero to the next
// handle.
func (wb *WriteBack) SetID(pid *Handle) {
if *pid == 0 {
*pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1))
}
}
// Add adds an item to the writeback queue or resets its timer if it
// is already there.
//
// If id is 0 then a new item will always be created and the new
// Handle will be returned.
//
// Use SetID to create Handles in advance of calling Add
//
// If modified is false then it it doesn't cancel a pending upload if
// there is one as there is no need.
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok {
wbItem = wb._newItem(id, name)
} else {
if wbItem.uploading && modified {
// We are uploading already so cancel the upload
wb._cancelUpload(wbItem)
}
// Kick the timer on
wb.items._update(wbItem, wb._newExpiry())
}
wbItem.putFn = putFn
wb._resetTimer()
return wbItem.id
}
// _remove should be called when a file should be removed from the
// writeback queue. This cancels a writeback if there is one and
// doesn't return the item to the queue.
//
// This should be called with the lock held
func (wb *WriteBack) _remove(id Handle) (found bool) {
wbItem, found := wb.lookup[id]
if found {
fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id)
if wbItem.uploading {
// We are uploading already so cancel the upload
wb._cancelUpload(wbItem)
}
// Remove the item from the heap
wb._removeItem(wbItem)
// Remove the item from the lookup map
wb._delItem(wbItem)
}
wb._resetTimer()
return found
}
// Remove should be called when a file should be removed from the
// writeback queue. This cancels a writeback if there is one and
// doesn't return the item to the queue.
func (wb *WriteBack) Remove(id Handle) (found bool) {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb._remove(id)
}
// Rename should be called when a file might be uploading and it gains
// a new name. This will cancel the upload and put it back in the
// queue.
func (wb *WriteBack) Rename(id Handle, name string) {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok {
return
}
if wbItem.uploading {
// We are uploading already so cancel the upload
wb._cancelUpload(wbItem)
}
// Check to see if there are any uploads with the existing
// name and remove them
for existingID, existingItem := range wb.lookup {
if existingID != id && existingItem.name == name {
wb._remove(existingID)
}
}
wbItem.name = name
// Kick the timer on
wb.items._update(wbItem, wb._newExpiry())
wb._resetTimer()
}
// upload the item - called as a goroutine
//
// uploading will have been incremented here already
func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
putFn := wbItem.putFn
wbItem.tries++
fs.Debugf(wbItem.name, "vfs cache: starting upload")
wb.mu.Unlock()
err := putFn(ctx)
wb.mu.Lock()
wbItem.cancel() // cancel context to release resources since store done
wbItem.uploading = false
wb.uploads--
if err != nil {
// FIXME should this have a max number of transfer attempts?
wbItem.delay *= 2
if wbItem.delay > maxUploadDelay {
wbItem.delay = maxUploadDelay
}
if errors.Is(err, context.Canceled) {
fs.Infof(wbItem.name, "vfs cache: upload canceled")
// Upload was cancelled so reset timer
wbItem.delay = wb.opt.WriteBack
} else {
fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err)
}
// push the item back on the queue for retry
wb._pushItem(wbItem)
wb.items._update(wbItem, time.Now().Add(wbItem.delay))
} else {
fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries)
// show that we are done with the item
wb._delItem(wbItem)
}
wb._resetTimer()
close(wbItem.done)
}
// cancel the upload - the item should be on the heap after this returns
//
// call with lock held
func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) {
if !wbItem.uploading {
return
}
fs.Debugf(wbItem.name, "vfs cache: cancelling upload")
if wbItem.cancel != nil {
// Cancel the upload - this may or may not be effective
wbItem.cancel()
// wait for the uploader to finish
//
// we need to wait without the lock otherwise the
// background part will never run.
wb.mu.Unlock()
<-wbItem.done
wb.mu.Lock()
}
// uploading items are not on the heap so add them back
wb._pushItem(wbItem)
fs.Debugf(wbItem.name, "vfs cache: cancelled upload")
}
// cancelUpload cancels the upload of the item if there is one in progress
//
// it returns true if there was an upload in progress
func (wb *WriteBack) cancelUpload(id Handle) bool {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[id]
if !ok || !wbItem.uploading {
return false
}
wb._cancelUpload(wbItem)
return true
}
// this uploads as many items as possible
func (wb *WriteBack) processItems(ctx context.Context) {
wb.mu.Lock()
defer wb.mu.Unlock()
if wb.ctx.Err() != nil {
return
}
resetTimer := true
for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() {
// If reached transfer limit don't restart the timer
if wb.uploads >= fs.GetConfig(context.TODO()).Transfers {
fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded")
resetTimer = false
break
}
// Pop the item, mark as uploading and start the uploader
wbItem = wb._popItem()
//fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item)
wbItem.uploading = true
wb.uploads++
newCtx, cancel := context.WithCancel(ctx)
wbItem.cancel = cancel
wbItem.done = make(chan struct{})
go wb.upload(newCtx, wbItem)
}
if resetTimer {
wb._resetTimer()
} else {
wb._stopTimer()
}
}
// Stats return the number of uploads in progress and queued
func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb.uploads, len(wb.items)
}