diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index e6f72cb2e..fd5a3d2ba 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -60,8 +60,8 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - remotePool: new(util.PseudoWorkerPool), - localPool: new(util.PseudoWorkerPool), + remotePool: util.NewPseudoWorkerPool(), + localPool: util.NewPseudoWorkerPool(), log: zap.L(), } } diff --git a/pkg/util/worker_pool.go b/pkg/util/worker_pool.go index 9bf455f45..453f9f3fc 100644 --- a/pkg/util/worker_pool.go +++ b/pkg/util/worker_pool.go @@ -1,5 +1,10 @@ package util +import ( + "github.com/panjf2000/ants/v2" + "go.uber.org/atomic" +) + // WorkerPool represents the tool for control // the execution of go-routine pool. type WorkerPool interface { @@ -9,16 +14,41 @@ type WorkerPool interface { // Implementation must return any error encountered // that prevented the function from being queued. Submit(func()) error + + // Release releases worker pool resources. All `Submit` calls will + // finish with ErrPoolClosed. It doesn't wait until all submitted + // functions have returned so synchronization must be achieved + // via other means (e.g. sync.WaitGroup). + Release() } -// PseudoWorkerPool represents pseudo worker pool which executes submitted job immediately in the caller's routine.. -type PseudoWorkerPool struct{} +// pseudoWorkerPool represents pseudo worker pool which executes submitted job immediately in the caller's routine. +type pseudoWorkerPool struct { + closed atomic.Bool +} + +// ErrPoolClosed is returned when submitting task to a closed pool. +var ErrPoolClosed = ants.ErrPoolClosed + +// NewPseudoWorkerPool returns new instance of a synchronous worker pool. +func NewPseudoWorkerPool() WorkerPool { + return &pseudoWorkerPool{} +} // Submit executes passed function immediately. // // Always returns nil. -func (PseudoWorkerPool) Submit(fn func()) error { +func (p *pseudoWorkerPool) Submit(fn func()) error { + if p.closed.Load() { + return ErrPoolClosed + } + fn() return nil } + +// Release implements WorkerPool interface. +func (p *pseudoWorkerPool) Release() { + p.closed.Store(true) +} diff --git a/pkg/util/worker_pool_test.go b/pkg/util/worker_pool_test.go new file mode 100644 index 000000000..7de635d80 --- /dev/null +++ b/pkg/util/worker_pool_test.go @@ -0,0 +1,52 @@ +package util + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSyncWorkerPool(t *testing.T) { + t.Run("submit to released pool", func(t *testing.T) { + p := NewPseudoWorkerPool() + p.Release() + require.Equal(t, ErrPoolClosed, p.Submit(func() {})) + }) + t.Run("create and wait", func(t *testing.T) { + p := NewPseudoWorkerPool() + ch1, ch2 := make(chan struct{}), make(chan struct{}) + wg := new(sync.WaitGroup) + wg.Add(2) + go func(t *testing.T) { + defer wg.Done() + err := p.Submit(newControlledReturnFunc(ch1)) + require.NoError(t, err) + }(t) + go func(t *testing.T) { + defer wg.Done() + err := p.Submit(newControlledReturnFunc(ch2)) + require.NoError(t, err) + }(t) + + // Make sure functions were submitted. + <-ch1 + <-ch2 + p.Release() + require.Equal(t, ErrPoolClosed, p.Submit(func() {})) + + close(ch1) + close(ch2) + wg.Wait() + }) +} + +// newControlledReturnFunc returns function which signals in ch after +// it has started and waits for some value in channel to return. +// ch must be unbuffered. +func newControlledReturnFunc(ch chan struct{}) func() { + return func() { + ch <- struct{}{} + <-ch + } +}