frostfs-observability/logging/lokicore/loki/setup.go

97 lines
2.1 KiB
Go
Raw Permalink Normal View History

package loki
import (
"net/http"
"regexp"
"sync"
"time"
)
const logEntriesChanSize = 5000
// Represents a single log entry.
type logEntry struct {
Ts time.Time `json:"ts"`
Line string `json:"line"`
}
type labels map[string]string
// Stream represents a stream of log entries with associated labels.
type stream struct {
Labels labels `json:"-"`
Entries []logEntry `json:"entries"`
}
type promtailMsg struct {
Streams []stream `json:"streams"`
}
// Client is a client for sending log entries.
type Client struct {
config Config
quit chan struct{}
entries chan logEntry
waitGroup sync.WaitGroup
client http.Client
missedMessages int64
mutex sync.RWMutex
}
type Config struct {
Enabled bool
// E.g. localhost:3100/api/prom/push.
Endpoint string
Labels map[string]string
// Maximum message buffering time.
BatchWait time.Duration
// Maximum number of messages in the queue.
BatchEntriesNumber int
}
// Setup initializes the client with the given configuration and starts the processing goroutine.
// It is the caller's responsibility to call Shutdown() to free resources.
func Setup(conf Config) *Client {
client := newClient()
client.config = conf
client.config.Endpoint = normalizeURL(client.config.Endpoint)
go client.run()
return client
}
// Shutdown stops the client and waits for all logs to be sent.
func (client *Client) Shutdown() {
client.mutex.Lock()
client.config.Enabled = false
client.mutex.Unlock()
close(client.quit)
client.waitGroup.Wait()
}
// IsEnabled checks whether the client is enabled.
func (client *Client) IsEnabled() bool {
client.mutex.RLock()
defer client.mutex.RUnlock()
return client.config.Enabled
}
func newClient() *Client {
return &Client{
quit: make(chan struct{}),
entries: make(chan logEntry, logEntriesChanSize),
client: http.Client{},
config: Config{Enabled: false},
mutex: sync.RWMutex{},
waitGroup: sync.WaitGroup{},
}
}
func normalizeURL(host string) string {
if !regexp.MustCompile(`^https?:\/\/`).MatchString(host) {
host = "http://" + host
}
return host
}