box: add polling support
This commit is contained in:
parent
0bd0a992a4
commit
a603efeaf4
2 changed files with 326 additions and 28 deletions
|
@ -63,7 +63,7 @@ var _ error = (*Error)(nil)
|
|||
// ItemFields are the fields needed for FileInfo
|
||||
var ItemFields = "type,id,sequence_id,etag,sha1,name,size,created_at,modified_at,content_created_at,content_modified_at,item_status,shared_link,owned_by"
|
||||
|
||||
// Types of things in Item
|
||||
// Types of things in Item/ItemMini
|
||||
const (
|
||||
ItemTypeFolder = "folder"
|
||||
ItemTypeFile = "file"
|
||||
|
@ -72,20 +72,31 @@ const (
|
|||
ItemStatusDeleted = "deleted"
|
||||
)
|
||||
|
||||
// ItemMini is a subset of the elements in a full Item returned by some API calls
|
||||
type ItemMini struct {
|
||||
Type string `json:"type"`
|
||||
ID string `json:"id"`
|
||||
SequenceID int64 `json:"sequence_id,string"`
|
||||
Etag string `json:"etag"`
|
||||
SHA1 string `json:"sha1"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// Item describes a folder or a file as returned by Get Folder Items and others
|
||||
type Item struct {
|
||||
Type string `json:"type"`
|
||||
ID string `json:"id"`
|
||||
SequenceID string `json:"sequence_id"`
|
||||
Etag string `json:"etag"`
|
||||
SHA1 string `json:"sha1"`
|
||||
Name string `json:"name"`
|
||||
Size float64 `json:"size"` // box returns this in xEyy format for very large numbers - see #2261
|
||||
CreatedAt Time `json:"created_at"`
|
||||
ModifiedAt Time `json:"modified_at"`
|
||||
ContentCreatedAt Time `json:"content_created_at"`
|
||||
ContentModifiedAt Time `json:"content_modified_at"`
|
||||
ItemStatus string `json:"item_status"` // active, trashed if the file has been moved to the trash, and deleted if the file has been permanently deleted
|
||||
Type string `json:"type"`
|
||||
ID string `json:"id"`
|
||||
SequenceID int64 `json:"sequence_id,string"`
|
||||
Etag string `json:"etag"`
|
||||
SHA1 string `json:"sha1"`
|
||||
Name string `json:"name"`
|
||||
Size float64 `json:"size"` // box returns this in xEyy format for very large numbers - see #2261
|
||||
CreatedAt Time `json:"created_at"`
|
||||
ModifiedAt Time `json:"modified_at"`
|
||||
ContentCreatedAt Time `json:"content_created_at"`
|
||||
ContentModifiedAt Time `json:"content_modified_at"`
|
||||
ItemStatus string `json:"item_status"` // active, trashed if the file has been moved to the trash, and deleted if the file has been permanently deleted
|
||||
Parent ItemMini `json:"parent"`
|
||||
SharedLink struct {
|
||||
URL string `json:"url,omitempty"`
|
||||
Access string `json:"access,omitempty"`
|
||||
|
@ -281,3 +292,30 @@ type User struct {
|
|||
Address string `json:"address"`
|
||||
AvatarURL string `json:"avatar_url"`
|
||||
}
|
||||
|
||||
// FileTreeChangeEventTypes are the events that can require cache invalidation
|
||||
var FileTreeChangeEventTypes = map[string]struct{}{
|
||||
"ITEM_COPY": {},
|
||||
"ITEM_CREATE": {},
|
||||
"ITEM_MAKE_CURRENT_VERSION": {},
|
||||
"ITEM_MODIFY": {},
|
||||
"ITEM_MOVE": {},
|
||||
"ITEM_RENAME": {},
|
||||
"ITEM_TRASH": {},
|
||||
"ITEM_UNDELETE_VIA_TRASH": {},
|
||||
"ITEM_UPLOAD": {},
|
||||
}
|
||||
|
||||
// Event is an array element in the response returned from /events
|
||||
type Event struct {
|
||||
EventType string `json:"event_type"`
|
||||
EventID string `json:"event_id"`
|
||||
Source Item `json:"source"`
|
||||
}
|
||||
|
||||
// Events is returned from /events
|
||||
type Events struct {
|
||||
ChunkSize int64 `json:"chunk_size"`
|
||||
Entries []Event `json:"entries"`
|
||||
NextStreamPosition int64 `json:"next_stream_position"`
|
||||
}
|
||||
|
|
|
@ -264,17 +264,26 @@ type Options struct {
|
|||
OwnedBy string `config:"owned_by"`
|
||||
}
|
||||
|
||||
// ItemMeta defines metadata we cache for each Item ID
|
||||
type ItemMeta struct {
|
||||
SequenceID int64 // the most recent event processed for this item
|
||||
ParentID string // ID of the parent directory of this item
|
||||
Name string // leaf name of this item
|
||||
}
|
||||
|
||||
// Fs represents a remote box
|
||||
type Fs struct {
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
srv *rest.Client // the connection to the server
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||
uploadToken *pacer.TokenDispenser // control concurrency
|
||||
name string // name of this remote
|
||||
root string // the path we are working on
|
||||
opt Options // parsed options
|
||||
features *fs.Features // optional features
|
||||
srv *rest.Client // the connection to the server
|
||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||
pacer *fs.Pacer // pacer for API calls
|
||||
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||
uploadToken *pacer.TokenDispenser // control concurrency
|
||||
itemMetaCacheMu *sync.Mutex // protects itemMetaCache
|
||||
itemMetaCache map[string]ItemMeta // map of Item ID to selected metadata
|
||||
}
|
||||
|
||||
// Object describes a box object
|
||||
|
@ -422,12 +431,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
|
||||
ci := fs.GetConfig(ctx)
|
||||
f := &Fs{
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
srv: rest.NewClient(client).SetRoot(rootURL),
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
|
||||
name: name,
|
||||
root: root,
|
||||
opt: *opt,
|
||||
srv: rest.NewClient(client).SetRoot(rootURL),
|
||||
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
|
||||
itemMetaCacheMu: new(sync.Mutex),
|
||||
itemMetaCache: make(map[string]ItemMeta),
|
||||
}
|
||||
f.features = (&fs.Features{
|
||||
CaseInsensitive: true,
|
||||
|
@ -682,6 +693,17 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
}
|
||||
entries = append(entries, o)
|
||||
}
|
||||
|
||||
// Cache some metadata for this Item to help us process events later
|
||||
// on. In particular, the box event API does not provide the old path
|
||||
// of the Item when it is renamed/deleted/moved/etc.
|
||||
f.itemMetaCacheMu.Lock()
|
||||
cachedItemMeta, found := f.itemMetaCache[info.ID]
|
||||
if !found || cachedItemMeta.SequenceID < info.SequenceID {
|
||||
f.itemMetaCache[info.ID] = ItemMeta{SequenceID: info.SequenceID, ParentID: directoryID, Name: info.Name}
|
||||
}
|
||||
f.itemMetaCacheMu.Unlock()
|
||||
|
||||
return false
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -1152,6 +1174,244 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
// ChangeNotify calls the passed function with a path that has had changes.
|
||||
// If the implementation uses polling, it should adhere to the given interval.
|
||||
//
|
||||
// Automatically restarts itself in case of unexpected behavior of the remote.
|
||||
//
|
||||
// Close the returned channel to stop being notified.
|
||||
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
|
||||
go func() {
|
||||
// get the `stream_position` early so all changes from now on get processed
|
||||
streamPosition, err := f.changeNotifyStreamPosition(ctx)
|
||||
if err != nil {
|
||||
fs.Infof(f, "Failed to get StreamPosition: %s", err)
|
||||
}
|
||||
|
||||
var ticker *time.Ticker
|
||||
var tickerC <-chan time.Time
|
||||
for {
|
||||
select {
|
||||
case pollInterval, ok := <-pollIntervalChan:
|
||||
if !ok {
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
ticker, tickerC = nil, nil
|
||||
}
|
||||
if pollInterval != 0 {
|
||||
ticker = time.NewTicker(pollInterval)
|
||||
tickerC = ticker.C
|
||||
}
|
||||
case <-tickerC:
|
||||
if streamPosition == "" {
|
||||
streamPosition, err = f.changeNotifyStreamPosition(ctx)
|
||||
if err != nil {
|
||||
fs.Infof(f, "Failed to get StreamPosition: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
streamPosition, err = f.changeNotifyRunner(ctx, notifyFunc, streamPosition)
|
||||
if err != nil {
|
||||
fs.Infof(f, "Change notify listener failure: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (f *Fs) changeNotifyStreamPosition(ctx context.Context) (streamPosition string, err error) {
|
||||
opts := rest.Opts{
|
||||
Method: "GET",
|
||||
Path: "/events",
|
||||
Parameters: fieldsValue(),
|
||||
}
|
||||
opts.Parameters.Set("stream_position", "now")
|
||||
opts.Parameters.Set("stream_type", "changes")
|
||||
|
||||
var result api.Events
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return strconv.FormatInt(result.NextStreamPosition, 10), nil
|
||||
}
|
||||
|
||||
// Attempts to construct the full path for an object, given the ID of its
|
||||
// parent directory and the name of the object.
|
||||
//
|
||||
// Can return "" if the parentID is not currently in the directory cache.
|
||||
func (f *Fs) getFullPath(parentID string, childName string) (fullPath string) {
|
||||
fullPath = ""
|
||||
name := f.opt.Enc.ToStandardName(childName)
|
||||
if parentID != "" {
|
||||
if parentDir, ok := f.dirCache.GetInv(parentID); ok {
|
||||
if len(parentDir) > 0 {
|
||||
fullPath = parentDir + "/" + name
|
||||
} else {
|
||||
fullPath = name
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No parent, this object is at the root
|
||||
fullPath = name
|
||||
}
|
||||
return fullPath
|
||||
}
|
||||
|
||||
func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), streamPosition string) (nextStreamPosition string, err error) {
|
||||
nextStreamPosition = streamPosition
|
||||
|
||||
// box can send duplicate Event IDs; filter any in a single notify run
|
||||
processedEventIDs := make(map[string]bool)
|
||||
|
||||
for {
|
||||
limit := f.opt.ListChunk
|
||||
|
||||
// box only allows a max of 500 events
|
||||
if limit > 500 {
|
||||
limit = 500
|
||||
}
|
||||
|
||||
opts := rest.Opts{
|
||||
Method: "GET",
|
||||
Path: "/events",
|
||||
Parameters: fieldsValue(),
|
||||
}
|
||||
opts.Parameters.Set("stream_position", nextStreamPosition)
|
||||
opts.Parameters.Set("stream_type", "changes")
|
||||
opts.Parameters.Set("limit", strconv.Itoa(limit))
|
||||
|
||||
var result api.Events
|
||||
var resp *http.Response
|
||||
fs.Debugf(f, "Checking for changes on remote (next_stream_position: %q)", nextStreamPosition)
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if result.ChunkSize != int64(len(result.Entries)) {
|
||||
return "", fmt.Errorf("invalid response to event request, chunk_size (%v) not equal to number of entries (%v)", result.ChunkSize, len(result.Entries))
|
||||
}
|
||||
|
||||
nextStreamPosition = strconv.FormatInt(result.NextStreamPosition, 10)
|
||||
if result.ChunkSize == 0 {
|
||||
return nextStreamPosition, nil
|
||||
}
|
||||
|
||||
type pathToClear struct {
|
||||
path string
|
||||
entryType fs.EntryType
|
||||
}
|
||||
var pathsToClear []pathToClear
|
||||
newEventIDs := 0
|
||||
for _, entry := range result.Entries {
|
||||
if entry.EventID == "" || processedEventIDs[entry.EventID] { // missing Event ID, or already saw this one
|
||||
continue
|
||||
}
|
||||
processedEventIDs[entry.EventID] = true
|
||||
newEventIDs++
|
||||
|
||||
if entry.Source.ID == "" { // missing File or Folder ID
|
||||
continue
|
||||
}
|
||||
if entry.Source.Type != api.ItemTypeFile && entry.Source.Type != api.ItemTypeFolder { // event is not for a file or folder
|
||||
continue
|
||||
}
|
||||
|
||||
// Only interested in event types that result in a file tree change
|
||||
if _, found := api.FileTreeChangeEventTypes[entry.EventType]; !found {
|
||||
continue
|
||||
}
|
||||
|
||||
f.itemMetaCacheMu.Lock()
|
||||
itemMeta, cachedItemMetaFound := f.itemMetaCache[entry.Source.ID]
|
||||
if cachedItemMetaFound {
|
||||
if itemMeta.SequenceID >= entry.Source.SequenceID {
|
||||
// Item in the cache has the same or newer SequenceID than
|
||||
// this event. Ignore this event, it must be old.
|
||||
f.itemMetaCacheMu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// This event is newer. Delete its entry from the cache,
|
||||
// we'll notify about its change below, then it's up to a
|
||||
// future list operation to repopulate the cache.
|
||||
delete(f.itemMetaCache, entry.Source.ID)
|
||||
}
|
||||
f.itemMetaCacheMu.Unlock()
|
||||
|
||||
entryType := fs.EntryDirectory
|
||||
if entry.Source.Type == api.ItemTypeFile {
|
||||
entryType = fs.EntryObject
|
||||
}
|
||||
|
||||
// The box event only includes the new path for the object (e.g.
|
||||
// the path after the object was moved). If there was an old path
|
||||
// saved in our cache, it must be cleared.
|
||||
if cachedItemMetaFound {
|
||||
path := f.getFullPath(itemMeta.ParentID, itemMeta.Name)
|
||||
if path != "" {
|
||||
pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType})
|
||||
}
|
||||
|
||||
// If this is a directory, also delete it from the dir cache.
|
||||
// This will effectively invalidate the item metadata cache
|
||||
// entries for all descendents of this directory, since we
|
||||
// will no longer be able to construct a full path for them.
|
||||
// This is exactly what we want, since we don't want to notify
|
||||
// on the paths of these descendents if one of their ancestors
|
||||
// has been renamed/deleted.
|
||||
if entry.Source.Type == api.ItemTypeFolder {
|
||||
f.dirCache.FlushDir(path)
|
||||
}
|
||||
}
|
||||
|
||||
// If the item is "active", then it is not trashed or deleted, so
|
||||
// it potentially has a valid parent.
|
||||
//
|
||||
// Construct the new path of the object, based on the Parent ID
|
||||
// and its name. If we get an empty result, it means we don't
|
||||
// currently know about this object so notification is unnecessary.
|
||||
if entry.Source.ItemStatus == api.ItemStatusActive {
|
||||
path := f.getFullPath(entry.Source.Parent.ID, entry.Source.Name)
|
||||
if path != "" {
|
||||
pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// box can sometimes repeatedly return the same Event IDs within a
|
||||
// short period of time. If it stops giving us new ones, treat it
|
||||
// the same as if it returned us none at all.
|
||||
if newEventIDs == 0 {
|
||||
return nextStreamPosition, nil
|
||||
}
|
||||
|
||||
notifiedPaths := make(map[string]bool)
|
||||
for _, p := range pathsToClear {
|
||||
if _, ok := notifiedPaths[p.path]; ok {
|
||||
continue
|
||||
}
|
||||
notifiedPaths[p.path] = true
|
||||
notifyFunc(p.path, p.entryType)
|
||||
}
|
||||
fs.Debugf(f, "Received %v events, resulting in %v paths and %v notifications", len(result.Entries), len(pathsToClear), len(notifiedPaths))
|
||||
}
|
||||
}
|
||||
|
||||
// DirCacheFlush resets the directory cache - used in testing as an
|
||||
// optional interface
|
||||
func (f *Fs) DirCacheFlush() {
|
||||
|
|
Loading…
Reference in a new issue