cache: integrate with Plex websocket

This commit is contained in:
remusb 2018-03-22 21:20:34 +02:00
parent d9c13bff83
commit 0ed0d9a7bc
3 changed files with 121 additions and 102 deletions

View file

@ -337,6 +337,9 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP) signal.Notify(c, syscall.SIGHUP)
atexit.Register(func() { atexit.Register(func() {
if plexURL != "" {
f.plexConnector.closeWebsocket()
}
f.StopBackgroundRunners() f.StopBackgroundRunners()
}) })
go func() { go func() {

View file

@ -146,34 +146,17 @@ func (r *Handle) scaleWorkers(desired int) {
} }
} }
func (r *Handle) requestExternalConfirmation() {
// if there's no external confirmation available
// then we skip this step
if len(r.workers) >= r.cacheFs().totalMaxWorkers ||
!r.cacheFs().plexConnector.isConnected() {
return
}
go r.cacheFs().plexConnector.isPlayingAsync(r.cachedObject, r.confirmReading)
}
func (r *Handle) confirmExternalReading() { func (r *Handle) confirmExternalReading() {
// if we have a max value of workers // if we have a max value of workers
// or there's no external confirmation available // or there's no external confirmation available
// then we skip this step // then we skip this step
if len(r.workers) >= r.cacheFs().totalMaxWorkers || if len(r.workers) > 1 ||
!r.cacheFs().plexConnector.isConnected() { !r.cacheFs().plexConnector.isConnected() {
return return
} }
if !r.cacheFs().plexConnector.isPlaying(r.cachedObject) {
select {
case confirmed := <-r.confirmReading:
if !confirmed {
return
}
default:
return return
} }
fs.Infof(r, "confirmed reading by external reader") fs.Infof(r, "confirmed reading by external reader")
r.scaleWorkers(r.cacheFs().totalMaxWorkers) r.scaleWorkers(r.cacheFs().totalMaxWorkers)
} }
@ -209,8 +192,6 @@ func (r *Handle) queueOffset(offset int64) {
r.seenOffsets[o] = true r.seenOffsets[o] = true
r.preloadQueue <- o r.preloadQueue <- o
} }
r.requestExternalConfirmation()
} }
} }
@ -294,7 +275,6 @@ func (r *Handle) Read(p []byte) (n int, err error) {
// first reading // first reading
if !r.reading { if !r.reading {
r.reading = true r.reading = true
r.requestExternalConfirmation()
} }
// reached EOF // reached EOF
if r.offset >= r.cachedObject.Size() { if r.offset >= r.cachedObject.Size() {

196
backend/cache/plex.go vendored
View file

@ -17,21 +17,49 @@ import (
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config"
"github.com/patrickmn/go-cache"
"golang.org/x/net/websocket"
) )
const ( const (
// defPlexLoginURL is the default URL for Plex login // defPlexLoginURL is the default URL for Plex login
defPlexLoginURL = "https://plex.tv/users/sign_in.json" defPlexLoginURL = "https://plex.tv/users/sign_in.json"
defPlexNotificationURL = "%s/:/websockets/notifications?X-Plex-Token=%s"
) )
// PlaySessionStateNotification is part of the API response of Plex
type PlaySessionStateNotification struct {
SessionKey string `json:"sessionKey"`
GUID string `json:"guid"`
Key string `json:"key"`
ViewOffset int64 `json:"viewOffset"`
State string `json:"state"`
TranscodeSession string `json:"transcodeSession"`
}
// NotificationContainer is part of the API response of Plex
type NotificationContainer struct {
Type string `json:"type"`
Size int `json:"size"`
PlaySessionState []PlaySessionStateNotification `json:"PlaySessionStateNotification"`
}
// PlexNotification is part of the API response of Plex
type PlexNotification struct {
Container NotificationContainer `json:"NotificationContainer"`
}
// plexConnector is managing the cache integration with Plex // plexConnector is managing the cache integration with Plex
type plexConnector struct { type plexConnector struct {
url *url.URL url *url.URL
username string username string
password string password string
token string token string
f *Fs f *Fs
mu sync.Mutex mu sync.Mutex
running bool
runningMu sync.Mutex
stateCache *cache.Cache
} }
// newPlexConnector connects to a Plex server and generates a token // newPlexConnector connects to a Plex server and generates a token
@ -42,11 +70,12 @@ func newPlexConnector(f *Fs, plexURL, username, password string) (*plexConnector
} }
pc := &plexConnector{ pc := &plexConnector{
f: f, f: f,
url: u, url: u,
username: username, username: username,
password: password, password: password,
token: "", token: "",
stateCache: cache.New(time.Hour, time.Minute),
} }
return pc, nil return pc, nil
@ -60,14 +89,80 @@ func newPlexConnectorWithToken(f *Fs, plexURL, token string) (*plexConnector, er
} }
pc := &plexConnector{ pc := &plexConnector{
f: f, f: f,
url: u, url: u,
token: token, token: token,
stateCache: cache.New(time.Hour, time.Minute),
} }
pc.listenWebsocket()
return pc, nil return pc, nil
} }
func (p *plexConnector) closeWebsocket() {
p.runningMu.Lock()
defer p.runningMu.Unlock()
fs.Infof("plex", "stopped Plex watcher")
p.running = false
}
func (p *plexConnector) listenWebsocket() {
u := strings.Replace(p.url.String(), "http://", "ws://", 1)
u = strings.Replace(u, "https://", "ws://", 1)
conn, err := websocket.Dial(fmt.Sprintf(defPlexNotificationURL, strings.TrimRight(u, "/"), p.token),
"", "http://localhost")
if err != nil {
fs.Errorf("plex", "%v", err)
return
}
p.running = true
go func() {
for {
if !p.isConnected() {
break
}
notif := &PlexNotification{}
err := websocket.JSON.Receive(conn, notif)
if err != nil {
fs.Debugf("plex", "%v", err)
time.Sleep(time.Second)
continue
}
// we're only interested in play events
if notif.Container.Type == "playing" {
// we loop through each of them
for _, v := range notif.Container.PlaySessionState {
// event type of playing
if v.State == "playing" {
// if it's not cached get the details and cache them
if _, found := p.stateCache.Get(v.Key); !found {
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), v.Key), nil)
if err != nil {
continue
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
continue
}
var data []byte
data, err = ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
p.stateCache.Set(v.Key, data, cache.DefaultExpiration)
}
} else if v.State == "stopped" {
p.stateCache.Delete(v.Key)
}
}
}
}
}()
}
// fillDefaultHeaders will add common headers to requests // fillDefaultHeaders will add common headers to requests
func (p *plexConnector) fillDefaultHeaders(req *http.Request) { func (p *plexConnector) fillDefaultHeaders(req *http.Request) {
req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String())) req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String()))
@ -115,13 +210,16 @@ func (p *plexConnector) authenticate() error {
config.SaveConfig() config.SaveConfig()
fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String()) fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String())
} }
p.listenWebsocket()
return nil return nil
} }
// isConnected checks if this rclone is authenticated to Plex // isConnected checks if this rclone is authenticated to Plex
func (p *plexConnector) isConnected() bool { func (p *plexConnector) isConnected() bool {
return p.token != "" p.runningMu.Lock()
defer p.runningMu.Unlock()
return p.running
} }
// isConfigured checks if this rclone is configured to use a Plex server // isConfigured checks if this rclone is configured to use a Plex server
@ -142,64 +240,8 @@ func (p *plexConnector) isPlaying(co *Object) bool {
} }
isPlaying := false isPlaying := false
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil) for _, v := range p.stateCache.Items() {
if err != nil { if bytes.Contains(v.Object.([]byte), []byte(remote)) {
return false
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
var data map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&data)
if err != nil {
return false
}
sizeGen, ok := get(data, "MediaContainer", "size")
if !ok {
return false
}
size, ok := sizeGen.(float64)
if !ok || size < float64(1) {
return false
}
videosGen, ok := get(data, "MediaContainer", "Video")
if !ok {
fs.Debugf("plex", "empty videos: %v", data)
return false
}
videos, ok := videosGen.([]interface{})
if !ok || len(videos) < 1 {
fs.Debugf("plex", "empty videos: %v", data)
return false
}
for _, v := range videos {
keyGen, ok := get(v, "key")
if !ok {
fs.Debugf("plex", "failed to find: key")
continue
}
key, ok := keyGen.(string)
if !ok {
fs.Debugf("plex", "failed to understand: key")
continue
}
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), key), nil)
if err != nil {
return false
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
var data []byte
data, err = ioutil.ReadAll(resp.Body)
if err != nil {
return false
}
if bytes.Contains(data, []byte(remote)) {
isPlaying = true isPlaying = true
break break
} }
@ -208,12 +250,6 @@ func (p *plexConnector) isPlaying(co *Object) bool {
return isPlaying return isPlaying
} }
func (p *plexConnector) isPlayingAsync(co *Object, response chan bool) {
time.Sleep(time.Second) // FIXME random guess here
res := p.isPlaying(co)
response <- res
}
// adapted from: https://stackoverflow.com/a/28878037 (credit) // adapted from: https://stackoverflow.com/a/28878037 (credit)
func get(m interface{}, path ...interface{}) (interface{}, bool) { func get(m interface{}, path ...interface{}) (interface{}, bool) {
for _, p := range path { for _, p := range path {