frostfs-observability/logging/lokicore/loki/log.go

109 lines
2.5 KiB
Go
Raw Normal View History

package loki
import (
"errors"
"strconv"
"time"
)
// Send sends the message to the loki server.
// If the client is disabled, it returns error.
// If the entries channel is full, the message is discarded and returns error.
func (client *Client) Send(msg string, timestamp time.Time) error {
if !client.IsEnabled() {
client.missedMessages++
return errors.New("client disabled")
}
if client.addToEntries(timestamp, msg) {
if client.missedMessages > 0 {
client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed")
client.missedMessages = 0
}
} else {
client.missedMessages++
return errors.New("channel is full")
}
return nil
}
// addToEntries attempts to add a log entry to the entries channel.
// It returns true if the entry was added successfully, and false otherwise.
func (client *Client) addToEntries(timestamp time.Time, msg string) bool {
select {
case client.entries <- logEntry{
Ts: timestamp,
Line: msg,
}:
return true
default:
return false
}
}
// run manages the sending of log batches to Loki.
// It collects log entries into batches and sends them either when a batch is full,
// or when the maximum wait time has elapsed.
func (client *Client) run() {
if !client.IsEnabled() {
client.drainEntries()
return
}
client.waitGroup.Add(1)
defer client.waitGroup.Done()
var batch []logEntry
maxWait := time.NewTimer(client.config.BatchWait)
for {
select {
case <-client.quit:
for len(client.entries) > 0 {
entry := <-client.entries
batch = append(batch, entry)
}
batch = client.processMissedMessages(batch)
client.processBatch(batch)
return
case entry := <-client.entries:
batch = append(batch, entry)
if len(batch) >= client.config.BatchEntriesNumber {
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
case <-maxWait.C:
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
}
}
func (client *Client) drainEntries() {
for len(client.entries) > 0 {
<-client.entries
}
}
func (client *Client) processBatch(batch []logEntry) []logEntry {
if len(batch) > 0 {
client.sendLogs(batch)
batch = batch[:0]
}
return batch
}
func (client *Client) processMissedMessages(batch []logEntry) []logEntry {
if client.missedMessages > 0 {
batch = append(batch, logEntry{
Ts: time.Now(),
Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed",
})
client.missedMessages = 0
}
return batch
}