package loki import ( "net/http" "regexp" "sync" "time" ) const logEntriesChanSize = 5000 // Represents a single log entry. type logEntry struct { Ts time.Time `json:"ts"` Line string `json:"line"` } type labels map[string]string // Stream represents a stream of log entries with associated labels. type stream struct { Labels labels `json:"-"` Entries []logEntry `json:"entries"` } type promtailMsg struct { Streams []stream `json:"streams"` } // Client is a client for sending log entries. type Client struct { config Config quit chan struct{} entries chan logEntry waitGroup sync.WaitGroup client http.Client missedMessages int64 mutex sync.RWMutex } type Config struct { Enabled bool // E.g. localhost:3100/api/prom/push. Endpoint string Labels map[string]string // Maximum message buffering time. BatchWait time.Duration // Maximum number of messages in the queue. BatchEntriesNumber int } // Setup initializes the client with the given configuration and starts the processing goroutine. // It is the caller's responsibility to call Shutdown() to free resources. func Setup(conf Config) *Client { client := newClient() client.config = conf client.config.Endpoint = normalizeURL(client.config.Endpoint) go client.run() return client } // Shutdown stops the client and waits for all logs to be sent. func (client *Client) Shutdown() { client.mutex.Lock() client.config.Enabled = false client.mutex.Unlock() close(client.quit) client.waitGroup.Wait() } // IsEnabled checks whether the client is enabled. func (client *Client) IsEnabled() bool { client.mutex.RLock() defer client.mutex.RUnlock() return client.config.Enabled } func newClient() *Client { return &Client{ quit: make(chan struct{}), entries: make(chan logEntry, logEntriesChanSize), client: http.Client{}, config: Config{Enabled: false}, mutex: sync.RWMutex{}, waitGroup: sync.WaitGroup{}, } } func normalizeURL(host string) string { if !regexp.MustCompile(`^https?:\/\/`).MatchString(host) { host = "http://" + host } return host }