cache: delay Plex connection to the first read handle - fixes #1903
This commit is contained in:
parent
829dd1ad25
commit
84701e376a
3 changed files with 45 additions and 31 deletions
15
cache/cache.go
vendored
15
cache/cache.go
vendored
|
@ -301,14 +301,11 @@ func NewFs(name, rpath string) (fs.Fs, error) {
|
||||||
|
|
||||||
f.plexConnector = &plexConnector{}
|
f.plexConnector = &plexConnector{}
|
||||||
if plexURL != "" {
|
if plexURL != "" {
|
||||||
usingPlex := false
|
|
||||||
|
|
||||||
if plexToken != "" {
|
if plexToken != "" {
|
||||||
f.plexConnector, err = newPlexConnectorWithToken(f, plexURL, plexToken)
|
f.plexConnector, err = newPlexConnectorWithToken(f, plexURL, plexToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
|
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
|
||||||
}
|
}
|
||||||
usingPlex = true
|
|
||||||
} else {
|
} else {
|
||||||
plexUsername := fs.ConfigFileGet(name, "plex_username")
|
plexUsername := fs.ConfigFileGet(name, "plex_username")
|
||||||
plexPassword := fs.ConfigFileGet(name, "plex_password")
|
plexPassword := fs.ConfigFileGet(name, "plex_password")
|
||||||
|
@ -321,20 +318,8 @@ func NewFs(name, rpath string) (fs.Fs, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
|
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
|
||||||
}
|
}
|
||||||
if f.plexConnector.token != "" {
|
|
||||||
fs.ConfigFileSet(name, "plex_token", f.plexConnector.token)
|
|
||||||
fs.SaveConfig()
|
|
||||||
}
|
|
||||||
usingPlex = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if usingPlex {
|
|
||||||
fs.Infof(name, "Connected to Plex server: %v", plexURL)
|
|
||||||
// when connected to a Plex server we default to 1 worker (Plex scans all the time)
|
|
||||||
// and leave max workers as a setting to scale out the workers on demand during playback
|
|
||||||
f.totalWorkers = 1
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbPath := *cacheDbPath
|
dbPath := *cacheDbPath
|
||||||
|
|
16
cache/handle.go
vendored
16
cache/handle.go
vendored
|
@ -70,7 +70,21 @@ func (r *Handle) startReadWorkers() {
|
||||||
if r.hasAtLeastOneWorker() {
|
if r.hasAtLeastOneWorker() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.scaleWorkers(r.cacheFs().totalWorkers)
|
totalWorkers := r.cacheFs().totalWorkers
|
||||||
|
|
||||||
|
if r.cacheFs().plexConnector.isConfigured() {
|
||||||
|
if !r.cacheFs().plexConnector.isConnected() {
|
||||||
|
err := r.cacheFs().plexConnector.authenticate()
|
||||||
|
if err != nil {
|
||||||
|
fs.Infof(r, "failed to authenticate to Plex: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.cacheFs().plexConnector.isConnected() {
|
||||||
|
totalWorkers = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r.scaleWorkers(totalWorkers)
|
||||||
}
|
}
|
||||||
|
|
||||||
// scaleOutWorkers will increase the worker pool count by the provided amount
|
// scaleOutWorkers will increase the worker pool count by the provided amount
|
||||||
|
|
45
cache/plex.go
vendored
45
cache/plex.go
vendored
|
@ -10,6 +10,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,9 +22,12 @@ const (
|
||||||
|
|
||||||
// plexConnector is managing the cache integration with Plex
|
// plexConnector is managing the cache integration with Plex
|
||||||
type plexConnector struct {
|
type plexConnector struct {
|
||||||
url *url.URL
|
url *url.URL
|
||||||
token string
|
username string
|
||||||
f *Fs
|
password string
|
||||||
|
token string
|
||||||
|
f *Fs
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPlexConnector connects to a Plex server and generates a token
|
// newPlexConnector connects to a Plex server and generates a token
|
||||||
|
@ -33,14 +38,11 @@ func newPlexConnector(f *Fs, plexURL, username, password string) (*plexConnector
|
||||||
}
|
}
|
||||||
|
|
||||||
pc := &plexConnector{
|
pc := &plexConnector{
|
||||||
f: f,
|
f: f,
|
||||||
url: u,
|
url: u,
|
||||||
token: "",
|
username: username,
|
||||||
}
|
password: password,
|
||||||
|
token: "",
|
||||||
err = pc.authenticate(username, password)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pc, nil
|
return pc, nil
|
||||||
|
@ -74,10 +76,13 @@ func (p *plexConnector) fillDefaultHeaders(req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// authenticate will generate a token based on a username/password
|
// authenticate will generate a token based on a username/password
|
||||||
func (p *plexConnector) authenticate(username, password string) error {
|
func (p *plexConnector) authenticate() error {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
form := url.Values{}
|
form := url.Values{}
|
||||||
form.Set("user[login]", username)
|
form.Set("user[login]", p.username)
|
||||||
form.Add("user[password]", password)
|
form.Add("user[password]", p.password)
|
||||||
req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode()))
|
req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -101,15 +106,25 @@ func (p *plexConnector) authenticate(username, password string) error {
|
||||||
return fmt.Errorf("failed to obtain token: %v", data)
|
return fmt.Errorf("failed to obtain token: %v", data)
|
||||||
}
|
}
|
||||||
p.token = token
|
p.token = token
|
||||||
|
if p.token != "" {
|
||||||
|
fs.ConfigFileSet(p.f.Name(), "plex_token", p.token)
|
||||||
|
fs.SaveConfig()
|
||||||
|
fs.Infof(p.f.Name(), "Connected to Plex server: %v", p.url.String())
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// isConnected checks if this Plex
|
// isConnected checks if this rclone is authenticated to Plex
|
||||||
func (p *plexConnector) isConnected() bool {
|
func (p *plexConnector) isConnected() bool {
|
||||||
return p.token != ""
|
return p.token != ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isConfigured checks if this rclone is configured to use a Plex server
|
||||||
|
func (p *plexConnector) isConfigured() bool {
|
||||||
|
return p.url != nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *plexConnector) isPlaying(co *Object) bool {
|
func (p *plexConnector) isPlaying(co *Object) bool {
|
||||||
isPlaying := false
|
isPlaying := false
|
||||||
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil)
|
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil)
|
||||||
|
|
Loading…
Reference in a new issue