frostfs-s3-lifecycler/internal/notificator/limiter.go
Denis Kirillov d8b5cd5fc2 [#3] Add job fetcher
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-22 13:32:04 +03:00

71 lines
1.4 KiB
Go

package notificator
import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"go.uber.org/zap"
)
// handlerLimiter is a limiter to make some works to be sequential
// and interrupt previous one if new one is submitted.
type handlerLimiter struct {
ctx context.Context
log *zap.Logger
handler NewEpochHandler
work chan func()
mu sync.Mutex
cancelCurrent context.CancelFunc
}
func newHandlerLimiter(ctx context.Context, handler NewEpochHandler, log *zap.Logger) *handlerLimiter {
hl := &handlerLimiter{
ctx: ctx,
log: log,
handler: handler,
work: make(chan func()),
cancelCurrent: func() {},
}
go hl.start(ctx)
return hl
}
func (h *handlerLimiter) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(h.work)
return
case work := <-h.work:
work()
}
}
}
func (h *handlerLimiter) replaceCurrentWorkContext(ctx context.Context) (workCtx context.Context) {
h.mu.Lock()
defer h.mu.Unlock()
h.cancelCurrent()
workCtx, h.cancelCurrent = context.WithCancel(ctx)
return workCtx
}
func (h *handlerLimiter) Handler(e event.Event) {
ee, ok := e.(NewEpochEvent)
if !ok {
return
}
workCtx := h.replaceCurrentWorkContext(h.ctx)
h.log.Debug(logs.NewEpochWasTriggered, zap.Uint64("epoch", ee.Epoch))
h.work <- func() {
h.handler(workCtx, ee)
}
}