[#3] Clean old jobs from fetcher jobs channel buffer
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
2613504b94
commit
c4f259e68b
2 changed files with 74 additions and 0 deletions
|
@ -141,6 +141,8 @@ func (p *JobProvider) startFetchRoutine(ctx context.Context) {
|
|||
|
||||
p.cancelCurrentFetch()
|
||||
wg.Wait()
|
||||
p.cleanJobChannel()
|
||||
|
||||
epochCtx, p.cancelCurrentFetch = context.WithCancel(ctx)
|
||||
|
||||
wg.Add(1)
|
||||
|
@ -149,6 +151,15 @@ func (p *JobProvider) startFetchRoutine(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *JobProvider) cleanJobChannel() {
|
||||
for len(p.jobChan) != 0 {
|
||||
select {
|
||||
case <-p.jobChan:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *JobProvider) handleEpoch(ctx context.Context, epoch uint64, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -293,6 +295,67 @@ func TestFetcherCancel(t *testing.T) {
|
|||
require.Len(t, res, 1)
|
||||
}
|
||||
|
||||
func TestFetcherCleanBuffer(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
mocks, err := initFetcherMocks(1, 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
epochCh := make(chan uint64)
|
||||
f := newJobProvider(ctx, mocks, epochCh, key, log, 11)
|
||||
|
||||
epochCh <- 1
|
||||
|
||||
for len(f.Jobs()) != 10 { // wait jobs buffer be filled by first epoch works
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
mocks, err = initFetcherMocks(1, 11)
|
||||
require.NoError(t, err)
|
||||
updateFetcherMocks(f, mocks)
|
||||
|
||||
epochCh <- 2
|
||||
close(epochCh)
|
||||
|
||||
for len(f.Jobs()) != 11 { // wait jobs buffer be filled by second epoch works
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
for job := range f.Jobs() {
|
||||
require.Equal(t, uint64(2), job.Epoch, "not all old epoch job is cleaned from buffer")
|
||||
}
|
||||
}
|
||||
|
||||
func newJobProvider(ctx context.Context, mocks *fetchersMock, epochCh <-chan uint64, key *keys.PrivateKey, log *zap.Logger, bufferSize int) *JobProvider {
|
||||
cfg := Config{
|
||||
UserFetcher: mocks.userFetcher,
|
||||
ContainerFetcher: mocks.containerFetcher,
|
||||
FrostFSFetcher: mocks.configurationFetcher,
|
||||
CredentialSource: mocks.credentialSource,
|
||||
TreeFetcher: mocks.treeFetcher,
|
||||
Settings: &settingsMock{},
|
||||
CurrentLifecycler: key,
|
||||
Logger: log,
|
||||
EpochChannel: epochCh,
|
||||
BufferSize: bufferSize,
|
||||
}
|
||||
|
||||
return NewJobProvider(ctx, cfg)
|
||||
}
|
||||
|
||||
func updateFetcherMocks(f *JobProvider, mocks *fetchersMock) {
|
||||
f.userFetcher = mocks.userFetcher
|
||||
f.containerFetcher = mocks.containerFetcher
|
||||
f.frostfsFetcher = mocks.configurationFetcher
|
||||
f.credentialSource = mocks.credentialSource
|
||||
f.treeFetcher = mocks.treeFetcher
|
||||
}
|
||||
|
||||
type fetchersMock struct {
|
||||
userFetcher *userFetcherMock
|
||||
containerFetcher *containerFetcherMock
|
||||
|
|
Loading…
Reference in a new issue