// Package writeback keeps track of the files which need to be written
// back to storage
package writeback

import (
	"container/heap"
	"context"
	"errors"
	"sync"
	"sync/atomic"
	"time"

	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/vfs/vfscommon"
)

const (
	maxUploadDelay = 5 * time.Minute // max delay between upload attempts
)

// PutFn is the interface that item provides to store the data
type PutFn func(context.Context) error

// Handle is returned for callers to keep track of writeback items
type Handle uint64

// WriteBack keeps track of the items which need to be written back to the disk at some point
type WriteBack struct {
	ctx     context.Context
	mu      sync.Mutex
	items   writeBackItems            // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
	lookup  map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled
	opt     *vfscommon.Options        // VFS options
	timer   *time.Timer               // next scheduled time for the uploader
	expiry  time.Time                 // time the next item expires or IsZero
	uploads int                       // number of uploads in progress

	// read and written with atomic
	id Handle // id of the last writeBackItem created
}

// New make a new WriteBack
//
// cancel the context to stop the background processing
func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
	wb := &WriteBack{
		ctx:    ctx,
		items:  writeBackItems{},
		lookup: make(map[Handle]*writeBackItem),
		opt:    opt,
	}
	heap.Init(&wb.items)
	return wb
}

// writeBackItem stores an Item awaiting writeback
//
// These are stored on the items heap when awaiting transfer but
// removed from the items heap when transferring. They remain in the
// lookup map until cancelled.
//
// writeBack.mu must be held to manipulate this
type writeBackItem struct {
	name      string             // name of the item so we don't have to read it from item
	id        Handle             // id of the item
	index     int                // index into the priority queue for update
	expiry    time.Time          // When this expires we will write it back
	uploading bool               // True if item is being processed by upload() method
	onHeap    bool               // true if this item is on the items heap
	cancel    context.CancelFunc // To cancel the upload with
	done      chan struct{}      // closed when the cancellation completes
	putFn     PutFn              // To write the object data
	tries     int                // number of times we have tried to upload
	delay     time.Duration      // delay between upload attempts
}

// A writeBackItems implements a priority queue by implementing
// heap.Interface and holds writeBackItems.
type writeBackItems []*writeBackItem

func (ws writeBackItems) Len() int { return len(ws) }

func (ws writeBackItems) Less(i, j int) bool {
	a, b := ws[i], ws[j]
	// If times are equal then use ID to disambiguate
	if a.expiry.Equal(b.expiry) {
		return a.id < b.id
	}
	return a.expiry.Before(b.expiry)
}

func (ws writeBackItems) Swap(i, j int) {
	ws[i], ws[j] = ws[j], ws[i]
	ws[i].index = i
	ws[j].index = j
}

func (ws *writeBackItems) Push(x interface{}) {
	n := len(*ws)
	item := x.(*writeBackItem)
	item.index = n
	*ws = append(*ws, item)
}

func (ws *writeBackItems) Pop() interface{} {
	old := *ws
	n := len(old)
	item := old[n-1]
	old[n-1] = nil  // avoid memory leak
	item.index = -1 // for safety
	*ws = old[0 : n-1]
	return item
}

// update modifies the expiry of an Item in the queue.
//
// call with lock held
func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) {
	item.expiry = expiry
	heap.Fix(ws, item.index)
}

// return a new expiry time based from now until the WriteBack timeout
//
// call with lock held
func (wb *WriteBack) _newExpiry() time.Time {
	expiry := time.Now()
	if wb.opt.WriteBack > 0 {
		expiry = expiry.Add(wb.opt.WriteBack)
	}
	// expiry = expiry.Round(time.Millisecond)
	return expiry
}

// make a new writeBackItem
//
// call with the lock held
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
	wb.SetID(&id)
	wbItem := &writeBackItem{
		name:   name,
		expiry: wb._newExpiry(),
		delay:  wb.opt.WriteBack,
		id:     id,
	}
	wb._addItem(wbItem)
	wb._pushItem(wbItem)
	return wbItem
}

// add a writeBackItem to the lookup map
//
// call with the lock held
func (wb *WriteBack) _addItem(wbItem *writeBackItem) {
	wb.lookup[wbItem.id] = wbItem
}

// delete a writeBackItem from the lookup map
//
// call with the lock held
func (wb *WriteBack) _delItem(wbItem *writeBackItem) {
	delete(wb.lookup, wbItem.id)
}

// pop a writeBackItem from the items heap
//
// call with the lock held
func (wb *WriteBack) _popItem() (wbItem *writeBackItem) {
	wbItem = heap.Pop(&wb.items).(*writeBackItem)
	wbItem.onHeap = false
	return wbItem
}

// push a writeBackItem onto the items heap
//
// call with the lock held
func (wb *WriteBack) _pushItem(wbItem *writeBackItem) {
	if !wbItem.onHeap {
		heap.Push(&wb.items, wbItem)
		wbItem.onHeap = true
	}
}

// remove a writeBackItem from the items heap
//
// call with the lock held
func (wb *WriteBack) _removeItem(wbItem *writeBackItem) {
	if wbItem.onHeap {
		heap.Remove(&wb.items, wbItem.index)
		wbItem.onHeap = false
	}
}

// peek the oldest writeBackItem - may be nil
//
// call with the lock held
func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) {
	if len(wb.items) == 0 {
		return nil
	}
	return wb.items[0]
}

// stop the timer which runs the expiries
func (wb *WriteBack) _stopTimer() {
	if wb.expiry.IsZero() {
		return
	}
	wb.expiry = time.Time{}
	// fs.Debugf(nil, "resetTimer STOP")
	if wb.timer != nil {
		wb.timer.Stop()
		wb.timer = nil
	}
}

// reset the timer which runs the expiries
func (wb *WriteBack) _resetTimer() {
	wbItem := wb._peekItem()
	if wbItem == nil {
		wb._stopTimer()
	} else {
		if wb.expiry.Equal(wbItem.expiry) {
			return
		}
		wb.expiry = wbItem.expiry
		dt := time.Until(wbItem.expiry)
		if dt < 0 {
			dt = 0
		}
		// fs.Debugf(nil, "resetTimer dt=%v", dt)
		if wb.timer != nil {
			wb.timer.Stop()
		}
		wb.timer = time.AfterFunc(dt, func() {
			wb.processItems(wb.ctx)
		})
	}
}

// SetID sets the Handle pointed to if it is non zero to the next
// handle.
func (wb *WriteBack) SetID(pid *Handle) {
	if *pid == 0 {
		*pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1))
	}
}

// Add adds an item to the writeback queue or resets its timer if it
// is already there.
//
// If id is 0 then a new item will always be created and the new
// Handle will be returned.
//
// Use SetID to create Handles in advance of calling Add
//
// If modified is false then it it doesn't cancel a pending upload if
// there is one as there is no need.
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
	wb.mu.Lock()
	defer wb.mu.Unlock()

	wbItem, ok := wb.lookup[id]
	if !ok {
		wbItem = wb._newItem(id, name)
	} else {
		if wbItem.uploading && modified {
			// We are uploading already so cancel the upload
			wb._cancelUpload(wbItem)
		}
		// Kick the timer on
		wb.items._update(wbItem, wb._newExpiry())
	}
	wbItem.putFn = putFn
	wb._resetTimer()
	return wbItem.id
}

// _remove should be called when a file should be removed from the
// writeback queue. This cancels a writeback if there is one and
// doesn't return the item to the queue.
//
// This should be called with the lock held
func (wb *WriteBack) _remove(id Handle) (found bool) {
	wbItem, found := wb.lookup[id]
	if found {
		fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id)
		if wbItem.uploading {
			// We are uploading already so cancel the upload
			wb._cancelUpload(wbItem)
		}
		// Remove the item from the heap
		wb._removeItem(wbItem)
		// Remove the item from the lookup map
		wb._delItem(wbItem)
	}
	wb._resetTimer()
	return found
}

// Remove should be called when a file should be removed from the
// writeback queue. This cancels a writeback if there is one and
// doesn't return the item to the queue.
func (wb *WriteBack) Remove(id Handle) (found bool) {
	wb.mu.Lock()
	defer wb.mu.Unlock()

	return wb._remove(id)
}

// Rename should be called when a file might be uploading and it gains
// a new name. This will cancel the upload and put it back in the
// queue.
func (wb *WriteBack) Rename(id Handle, name string) {
	wb.mu.Lock()
	defer wb.mu.Unlock()

	wbItem, ok := wb.lookup[id]
	if !ok {
		return
	}
	if wbItem.uploading {
		// We are uploading already so cancel the upload
		wb._cancelUpload(wbItem)
	}

	// Check to see if there are any uploads with the existing
	// name and remove them
	for existingID, existingItem := range wb.lookup {
		if existingID != id && existingItem.name == name {
			wb._remove(existingID)
		}
	}

	wbItem.name = name
	// Kick the timer on
	wb.items._update(wbItem, wb._newExpiry())

	wb._resetTimer()
}

// upload the item - called as a goroutine
//
// uploading will have been incremented here already
func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	putFn := wbItem.putFn
	wbItem.tries++

	fs.Debugf(wbItem.name, "vfs cache: starting upload")

	wb.mu.Unlock()
	err := putFn(ctx)
	wb.mu.Lock()

	wbItem.cancel() // cancel context to release resources since store done

	wbItem.uploading = false
	wb.uploads--

	if err != nil {
		// FIXME should this have a max number of transfer attempts?
		wbItem.delay *= 2
		if wbItem.delay > maxUploadDelay {
			wbItem.delay = maxUploadDelay
		}
		if errors.Is(err, context.Canceled) {
			fs.Infof(wbItem.name, "vfs cache: upload canceled")
			// Upload was cancelled so reset timer
			wbItem.delay = wb.opt.WriteBack
		} else {
			fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err)
		}
		// push the item back on the queue for retry
		wb._pushItem(wbItem)
		wb.items._update(wbItem, time.Now().Add(wbItem.delay))
	} else {
		fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries)
		// show that we are done with the item
		wb._delItem(wbItem)
	}
	wb._resetTimer()
	close(wbItem.done)
}

// cancel the upload - the item should be on the heap after this returns
//
// call with lock held
func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) {
	if !wbItem.uploading {
		return
	}
	fs.Debugf(wbItem.name, "vfs cache: cancelling upload")
	if wbItem.cancel != nil {
		// Cancel the upload - this may or may not be effective
		wbItem.cancel()
		// wait for the uploader to finish
		//
		// we need to wait without the lock otherwise the
		// background part will never run.
		wb.mu.Unlock()
		<-wbItem.done
		wb.mu.Lock()
	}
	// uploading items are not on the heap so add them back
	wb._pushItem(wbItem)
	fs.Debugf(wbItem.name, "vfs cache: cancelled upload")
}

// cancelUpload cancels the upload of the item if there is one in progress
//
// it returns true if there was an upload in progress
func (wb *WriteBack) cancelUpload(id Handle) bool {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	wbItem, ok := wb.lookup[id]
	if !ok || !wbItem.uploading {
		return false
	}
	wb._cancelUpload(wbItem)
	return true
}

// this uploads as many items as possible
func (wb *WriteBack) processItems(ctx context.Context) {
	wb.mu.Lock()
	defer wb.mu.Unlock()

	if wb.ctx.Err() != nil {
		return
	}

	resetTimer := true
	for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() {
		// If reached transfer limit don't restart the timer
		if wb.uploads >= fs.GetConfig(context.TODO()).Transfers {
			fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded")
			resetTimer = false
			break
		}
		// Pop the item, mark as uploading and start the uploader
		wbItem = wb._popItem()
		//fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item)
		wbItem.uploading = true
		wb.uploads++
		newCtx, cancel := context.WithCancel(ctx)
		wbItem.cancel = cancel
		wbItem.done = make(chan struct{})
		go wb.upload(newCtx, wbItem)
	}

	if resetTimer {
		wb._resetTimer()
	} else {
		wb._stopTimer()
	}
}

// Stats return the number of uploads in progress and queued
func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	return wb.uploads, len(wb.items)
}