package loki import ( "errors" "strconv" "time" ) // Send sends the message to the loki server. // If the client is disabled, it returns error. // If the entries channel is full, the message is discarded and returns error. func (client *Client) Send(msg string, timestamp time.Time) error { if !client.IsEnabled() { client.missedMessages++ return errors.New("client disabled") } if client.addToEntries(timestamp, msg) { if client.missedMessages > 0 { client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed") client.missedMessages = 0 } } else { client.missedMessages++ return errors.New("channel is full") } return nil } // addToEntries attempts to add a log entry to the entries channel. // It returns true if the entry was added successfully, and false otherwise. func (client *Client) addToEntries(timestamp time.Time, msg string) bool { select { case client.entries <- logEntry{ Ts: timestamp, Line: msg, }: return true default: return false } } // run manages the sending of log batches to Loki. // It collects log entries into batches and sends them either when a batch is full, // or when the maximum wait time has elapsed. func (client *Client) run() { if !client.IsEnabled() { client.drainEntries() return } client.waitGroup.Add(1) defer client.waitGroup.Done() var batch []logEntry maxWait := time.NewTimer(client.config.BatchWait) for { select { case <-client.quit: for len(client.entries) > 0 { entry := <-client.entries batch = append(batch, entry) } batch = client.processMissedMessages(batch) client.processBatch(batch) return case entry := <-client.entries: batch = append(batch, entry) if len(batch) >= client.config.BatchEntriesNumber { batch = client.processBatch(batch) maxWait.Reset(client.config.BatchWait) } case <-maxWait.C: batch = client.processBatch(batch) maxWait.Reset(client.config.BatchWait) } } } func (client *Client) drainEntries() { for len(client.entries) > 0 { <-client.entries } } func (client *Client) processBatch(batch []logEntry) []logEntry { if len(batch) > 0 { client.sendLogs(batch) batch = batch[:0] } return batch } func (client *Client) processMissedMessages(batch []logEntry) []logEntry { if client.missedMessages > 0 { batch = append(batch, logEntry{ Ts: time.Now(), Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed", }) client.missedMessages = 0 } return batch }