[#901] util: implement Release for PseudWorkerPool

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2021-10-12 14:55:49 +03:00 committed by Alex Vanin
parent 616013cb8a
commit 0beaed2ef4
3 changed files with 87 additions and 5 deletions

View file

@ -60,8 +60,8 @@ type cfg struct {
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
remotePool: new(util.PseudoWorkerPool), remotePool: util.NewPseudoWorkerPool(),
localPool: new(util.PseudoWorkerPool), localPool: util.NewPseudoWorkerPool(),
log: zap.L(), log: zap.L(),
} }
} }

View file

@ -1,5 +1,10 @@
package util package util
import (
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
)
// WorkerPool represents the tool for control // WorkerPool represents the tool for control
// the execution of go-routine pool. // the execution of go-routine pool.
type WorkerPool interface { type WorkerPool interface {
@ -9,16 +14,41 @@ type WorkerPool interface {
// Implementation must return any error encountered // Implementation must return any error encountered
// that prevented the function from being queued. // that prevented the function from being queued.
Submit(func()) error Submit(func()) error
// Release releases worker pool resources. All `Submit` calls will
// finish with ErrPoolClosed. It doesn't wait until all submitted
// functions have returned so synchronization must be achieved
// via other means (e.g. sync.WaitGroup).
Release()
} }
// PseudoWorkerPool represents pseudo worker pool which executes submitted job immediately in the caller's routine.. // pseudoWorkerPool represents pseudo worker pool which executes submitted job immediately in the caller's routine.
type PseudoWorkerPool struct{} type pseudoWorkerPool struct {
closed atomic.Bool
}
// ErrPoolClosed is returned when submitting task to a closed pool.
var ErrPoolClosed = ants.ErrPoolClosed
// NewPseudoWorkerPool returns new instance of a synchronous worker pool.
func NewPseudoWorkerPool() WorkerPool {
return &pseudoWorkerPool{}
}
// Submit executes passed function immediately. // Submit executes passed function immediately.
// //
// Always returns nil. // Always returns nil.
func (PseudoWorkerPool) Submit(fn func()) error { func (p *pseudoWorkerPool) Submit(fn func()) error {
if p.closed.Load() {
return ErrPoolClosed
}
fn() fn()
return nil return nil
} }
// Release implements WorkerPool interface.
func (p *pseudoWorkerPool) Release() {
p.closed.Store(true)
}

View file

@ -0,0 +1,52 @@
package util
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestSyncWorkerPool(t *testing.T) {
t.Run("submit to released pool", func(t *testing.T) {
p := NewPseudoWorkerPool()
p.Release()
require.Equal(t, ErrPoolClosed, p.Submit(func() {}))
})
t.Run("create and wait", func(t *testing.T) {
p := NewPseudoWorkerPool()
ch1, ch2 := make(chan struct{}), make(chan struct{})
wg := new(sync.WaitGroup)
wg.Add(2)
go func(t *testing.T) {
defer wg.Done()
err := p.Submit(newControlledReturnFunc(ch1))
require.NoError(t, err)
}(t)
go func(t *testing.T) {
defer wg.Done()
err := p.Submit(newControlledReturnFunc(ch2))
require.NoError(t, err)
}(t)
// Make sure functions were submitted.
<-ch1
<-ch2
p.Release()
require.Equal(t, ErrPoolClosed, p.Submit(func() {}))
close(ch1)
close(ch2)
wg.Wait()
})
}
// newControlledReturnFunc returns function which signals in ch after
// it has started and waits for some value in channel to return.
// ch must be unbuffered.
func newControlledReturnFunc(ch chan struct{}) func() {
return func() {
ch <- struct{}{}
<-ch
}
}