From a0b3fd3a3365f35d78835e132fb32f672ea80f91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Thu, 30 Aug 2018 12:15:30 +0200 Subject: [PATCH] cache: fix worker scale down Ensure that calling scaleWorkers will create/destroy the right amount of workers. --- backend/cache/handle.go | 80 +++++++++++++---------------------------- 1 file changed, 24 insertions(+), 56 deletions(-) diff --git a/backend/cache/handle.go b/backend/cache/handle.go index f60829928..eab1b76a2 100644 --- a/backend/cache/handle.go +++ b/backend/cache/handle.go @@ -49,12 +49,13 @@ type Handle struct { offset int64 seenOffsets map[int64]bool mu sync.Mutex + workersWg sync.WaitGroup confirmReading chan bool - - UseMemory bool - workers []*worker - closed bool - reading bool + workers int + maxWorkerID int + UseMemory bool + closed bool + reading bool } // NewObjectHandle returns a new Handle for an existing Object @@ -95,7 +96,7 @@ func (r *Handle) String() string { // startReadWorkers will start the worker pool func (r *Handle) startReadWorkers() { - if r.hasAtLeastOneWorker() { + if r.workers > 0 { return } totalWorkers := r.cacheFs().opt.TotalWorkers @@ -117,26 +118,27 @@ func (r *Handle) startReadWorkers() { // scaleOutWorkers will increase the worker pool count by the provided amount func (r *Handle) scaleWorkers(desired int) { - current := len(r.workers) + current := r.workers if current == desired { return } if current > desired { // scale in gracefully - for i := 0; i < current-desired; i++ { + for r.workers > desired { r.preloadQueue <- -1 + r.workers-- } } else { // scale out - for i := 0; i < desired-current; i++ { + for r.workers < desired { w := &worker{ r: r, - ch: r.preloadQueue, - id: current + i, + id: r.maxWorkerID, } + r.workersWg.Add(1) + r.workers++ + r.maxWorkerID++ go w.run() - - r.workers = append(r.workers, w) } } // ignore first scale out from 0 @@ -148,7 +150,7 @@ func (r *Handle) scaleWorkers(desired int) { func (r *Handle) confirmExternalReading() { // if we have a max value of workers // then we skip this step - if len(r.workers) > 1 || + if r.workers > 1 || !r.cacheFs().plexConnector.isConfigured() { return } @@ -178,7 +180,7 @@ func (r *Handle) queueOffset(offset int64) { } } - for i := 0; i < len(r.workers); i++ { + for i := 0; i < r.workers; i++ { o := r.preloadOffset + int64(r.cacheFs().opt.ChunkSize)*int64(i) if o < 0 || o >= r.cachedObject.Size() { continue @@ -193,16 +195,6 @@ func (r *Handle) queueOffset(offset int64) { } } -func (r *Handle) hasAtLeastOneWorker() bool { - oneWorker := false - for i := 0; i < len(r.workers); i++ { - if r.workers[i].isRunning() { - oneWorker = true - } - } - return oneWorker -} - // getChunk is called by the FS to retrieve a specific chunk of known start and size from where it can find it // it can be from transient or persistent cache // it will also build the chunk from the cache's specific chunk boundaries and build the final desired chunk in a buffer @@ -243,7 +235,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { // not found in ram or // the worker didn't managed to download the chunk in time so we abort and close the stream if err != nil || len(data) == 0 || !found { - if !r.hasAtLeastOneWorker() { + if r.workers == 0 { fs.Errorf(r, "out of workers") return nil, io.ErrUnexpectedEOF } @@ -304,14 +296,7 @@ func (r *Handle) Close() error { close(r.preloadQueue) r.closed = true // wait for workers to complete their jobs before returning - waitCount := 3 - for i := 0; i < len(r.workers); i++ { - waitIdx := 0 - for r.workers[i].isRunning() && waitIdx < waitCount { - time.Sleep(time.Second) - waitIdx++ - } - } + r.workersWg.Wait() r.memory.db.Flush() fs.Debugf(r, "cache reader closed %v", r.offset) @@ -348,12 +333,9 @@ func (r *Handle) Seek(offset int64, whence int) (int64, error) { } type worker struct { - r *Handle - ch <-chan int64 - rc io.ReadCloser - id int - running bool - mu sync.Mutex + r *Handle + rc io.ReadCloser + id int } // String is a representation of this worker @@ -398,33 +380,19 @@ func (w *worker) reader(offset, end int64, closeOpen bool) (io.ReadCloser, error }) } -func (w *worker) isRunning() bool { - w.mu.Lock() - defer w.mu.Unlock() - return w.running -} - -func (w *worker) setRunning(f bool) { - w.mu.Lock() - defer w.mu.Unlock() - w.running = f -} - // run is the main loop for the worker which receives offsets to preload func (w *worker) run() { var err error var data []byte - defer w.setRunning(false) defer func() { if w.rc != nil { _ = w.rc.Close() - w.setRunning(false) } + w.r.workersWg.Done() }() for { - chunkStart, open := <-w.ch - w.setRunning(true) + chunkStart, open := <-w.r.preloadQueue if chunkStart < 0 || !open { break }