From c4f259e68b352fe491004e435c5c288288bb93b8 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 26 Jul 2024 16:35:52 +0300 Subject: [PATCH] [#3] Clean old jobs from fetcher jobs channel buffer Signed-off-by: Denis Kirillov --- internal/lifecycle/fetcher.go | 11 ++++++ internal/lifecycle/fetcher_test.go | 63 ++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/internal/lifecycle/fetcher.go b/internal/lifecycle/fetcher.go index f0abbd1..8b70183 100644 --- a/internal/lifecycle/fetcher.go +++ b/internal/lifecycle/fetcher.go @@ -141,6 +141,8 @@ func (p *JobProvider) startFetchRoutine(ctx context.Context) { p.cancelCurrentFetch() wg.Wait() + p.cleanJobChannel() + epochCtx, p.cancelCurrentFetch = context.WithCancel(ctx) wg.Add(1) @@ -149,6 +151,15 @@ func (p *JobProvider) startFetchRoutine(ctx context.Context) { } } +func (p *JobProvider) cleanJobChannel() { + for len(p.jobChan) != 0 { + select { + case <-p.jobChan: + default: + } + } +} + func (p *JobProvider) handleEpoch(ctx context.Context, epoch uint64, wg *sync.WaitGroup) { defer wg.Done() diff --git a/internal/lifecycle/fetcher_test.go b/internal/lifecycle/fetcher_test.go index 1b75bfb..2f27d9a 100644 --- a/internal/lifecycle/fetcher_test.go +++ b/internal/lifecycle/fetcher_test.go @@ -8,6 +8,7 @@ import ( "io" "sync" "testing" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -21,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -293,6 +295,67 @@ func TestFetcherCancel(t *testing.T) { require.Len(t, res, 1) } +func TestFetcherCleanBuffer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log := zaptest.NewLogger(t) + + key, err := keys.NewPrivateKey() + require.NoError(t, err) + + mocks, err := initFetcherMocks(1, 10) + require.NoError(t, err) + + epochCh := make(chan uint64) + f := newJobProvider(ctx, mocks, epochCh, key, log, 11) + + epochCh <- 1 + + for len(f.Jobs()) != 10 { // wait jobs buffer be filled by first epoch works + time.Sleep(100 * time.Millisecond) + } + + mocks, err = initFetcherMocks(1, 11) + require.NoError(t, err) + updateFetcherMocks(f, mocks) + + epochCh <- 2 + close(epochCh) + + for len(f.Jobs()) != 11 { // wait jobs buffer be filled by second epoch works + time.Sleep(100 * time.Millisecond) + } + + for job := range f.Jobs() { + require.Equal(t, uint64(2), job.Epoch, "not all old epoch job is cleaned from buffer") + } +} + +func newJobProvider(ctx context.Context, mocks *fetchersMock, epochCh <-chan uint64, key *keys.PrivateKey, log *zap.Logger, bufferSize int) *JobProvider { + cfg := Config{ + UserFetcher: mocks.userFetcher, + ContainerFetcher: mocks.containerFetcher, + FrostFSFetcher: mocks.configurationFetcher, + CredentialSource: mocks.credentialSource, + TreeFetcher: mocks.treeFetcher, + Settings: &settingsMock{}, + CurrentLifecycler: key, + Logger: log, + EpochChannel: epochCh, + BufferSize: bufferSize, + } + + return NewJobProvider(ctx, cfg) +} + +func updateFetcherMocks(f *JobProvider, mocks *fetchersMock) { + f.userFetcher = mocks.userFetcher + f.containerFetcher = mocks.containerFetcher + f.frostfsFetcher = mocks.configurationFetcher + f.credentialSource = mocks.credentialSource + f.treeFetcher = mocks.treeFetcher +} + type fetchersMock struct { userFetcher *userFetcherMock containerFetcher *containerFetcherMock