engine: Add non-blocking send in the shard's notification channel #1238
2 changed files with 4 additions and 0 deletions
|
@ -252,6 +252,7 @@ const (
|
||||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||||
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||||
|
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
||||||
ShardStopEventListenerByContext = "stop event listener by context"
|
ShardStopEventListenerByContext = "stop event listener by context"
|
||||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||||
ShardGCIsStopped = "GC is stopped"
|
ShardGCIsStopped = "GC is stopped"
|
||||||
|
|
|
@ -340,6 +340,9 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case sh.NotificationChannel() <- ev:
|
case sh.NotificationChannel() <- ev:
|
||||||
|
default:
|
||||||
|
e.log.Debug(logs.ShardEventProcessingInProgress,
|
||||||
|
zap.Uint64("epoch", epoch), zap.Stringer("shard", sh.ID()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue