108 lines
2.5 KiB
Go
108 lines
2.5 KiB
Go
package loki
|
|
|
|
import (
|
|
"errors"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
// Send sends the message to the loki server.
|
|
// If the client is disabled, it returns error.
|
|
// If the entries channel is full, the message is discarded and returns error.
|
|
func (client *Client) Send(msg string, timestamp time.Time) error {
|
|
if !client.IsEnabled() {
|
|
client.missedMessages++
|
|
return errors.New("client disabled")
|
|
}
|
|
|
|
if client.addToEntries(timestamp, msg) {
|
|
if client.missedMessages > 0 {
|
|
client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed")
|
|
client.missedMessages = 0
|
|
}
|
|
} else {
|
|
client.missedMessages++
|
|
return errors.New("channel is full")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// addToEntries attempts to add a log entry to the entries channel.
|
|
// It returns true if the entry was added successfully, and false otherwise.
|
|
func (client *Client) addToEntries(timestamp time.Time, msg string) bool {
|
|
select {
|
|
case client.entries <- logEntry{
|
|
Ts: timestamp,
|
|
Line: msg,
|
|
}:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// run manages the sending of log batches to Loki.
|
|
// It collects log entries into batches and sends them either when a batch is full,
|
|
// or when the maximum wait time has elapsed.
|
|
func (client *Client) run() {
|
|
if !client.IsEnabled() {
|
|
client.drainEntries()
|
|
return
|
|
}
|
|
|
|
client.waitGroup.Add(1)
|
|
defer client.waitGroup.Done()
|
|
|
|
var batch []logEntry
|
|
maxWait := time.NewTimer(client.config.BatchWait)
|
|
|
|
for {
|
|
select {
|
|
case <-client.quit:
|
|
for len(client.entries) > 0 {
|
|
entry := <-client.entries
|
|
batch = append(batch, entry)
|
|
}
|
|
batch = client.processMissedMessages(batch)
|
|
client.processBatch(batch)
|
|
return
|
|
|
|
case entry := <-client.entries:
|
|
batch = append(batch, entry)
|
|
if len(batch) >= client.config.BatchEntriesNumber {
|
|
batch = client.processBatch(batch)
|
|
maxWait.Reset(client.config.BatchWait)
|
|
}
|
|
|
|
case <-maxWait.C:
|
|
batch = client.processBatch(batch)
|
|
maxWait.Reset(client.config.BatchWait)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) drainEntries() {
|
|
for len(client.entries) > 0 {
|
|
<-client.entries
|
|
}
|
|
}
|
|
|
|
func (client *Client) processBatch(batch []logEntry) []logEntry {
|
|
if len(batch) > 0 {
|
|
client.sendLogs(batch)
|
|
batch = batch[:0]
|
|
}
|
|
return batch
|
|
}
|
|
|
|
func (client *Client) processMissedMessages(batch []logEntry) []logEntry {
|
|
if client.missedMessages > 0 {
|
|
batch = append(batch, logEntry{
|
|
Ts: time.Now(),
|
|
Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed",
|
|
})
|
|
client.missedMessages = 0
|
|
}
|
|
return batch
|
|
}
|