[#5] logs: Add Loki

Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
Alexander Chuprov 2023-11-01 14:17:34 +03:00
parent 7960099809
commit b3ad3335ff
8 changed files with 428 additions and 4 deletions

8
go.mod
View file

@ -16,6 +16,11 @@ require (
google.golang.org/protobuf v1.30.0
)
require (
github.com/kr/text v0.2.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
@ -35,8 +40,9 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/zap v1.26.0
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

12
go.sum
View file

@ -55,6 +55,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -147,8 +148,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -203,6 +205,10 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -312,8 +318,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

67
logging/lokicore/core.go Normal file
View file

@ -0,0 +1,67 @@
package lokicore
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Zap Core for loki.
// Expands the zapcore.Core interface with calls to export logs to Loki.
type LokiCore struct {
original zapcore.Core
encoder zapcore.Encoder
loki *loki.Client
}
func New(original zapcore.Core, lokiCfg loki.Config) *LokiCore {
encoderConfig := zap.NewProductionEncoderConfig()
encoder := zapcore.NewJSONEncoder(encoderConfig)
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(time.RFC3339Nano)
return &LokiCore{
original: original,
encoder: encoder,
loki: loki.Setup(lokiCfg),
}
}
func (c *LokiCore) With(fields []zapcore.Field) zapcore.Core {
return &LokiCore{
original: c.original.With(fields),
encoder: c.encoder,
loki: c.loki,
}
}
func (c *LokiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if err := c.original.Write(entry, fields); err != nil {
return err
}
buffer, err := c.encoder.EncodeEntry(entry, fields)
defer buffer.Free()
if err != nil {
return err
}
return c.loki.Send(buffer.String(), entry.Time)
}
func (c *LokiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(entry.Level) {
return checked.AddCore(entry, c)
}
return checked
}
func (c *LokiCore) Sync() error {
return c.original.Sync()
}
func (c *LokiCore) Enabled(level zapcore.Level) bool {
return c.original.Enabled(level)
}

View file

@ -0,0 +1,32 @@
# git.frostfs.info/TrueCloudLab/frostfs-observability/loki"
A simple asynchronous client in Go for sending logs to Loki.
## Usage
```go
package main
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
)
func main() {
loki := loki.Setup(loki.Config{
Address: "localhost:3100/api/prom/push",
Labels: map[string]string{
"label": "test",
},
BatchWait: 1000,
BatchEntriesNumber: 200,
Enabled: true,
})
defer loki.Shutdown()
loki.Send("log message", time.Now())
}
```

View file

@ -0,0 +1,43 @@
package main
import (
"strconv"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
)
var wg sync.WaitGroup
const countMsgGroup = 100
const countMsg = 500000
func send(loki *loki.Client) {
wg.Add(1)
defer wg.Done()
for j := 0; j < countMsg/countMsgGroup; j++ {
for i := 0; i < countMsgGroup; i++ {
loki.Send(strconv.Itoa(j)+" "+strconv.Itoa(i)+" test log message", time.Now())
}
time.Sleep(20 * time.Millisecond)
}
}
func main() {
loki := loki.Setup(loki.Config{
Endpoint: "localhost:3100/api/prom/push",
Labels: map[string]string{
"label": "test",
},
BatchWait: 1000,
BatchEntriesNumber: 200,
Enabled: true,
})
go send(loki)
send(loki)
wg.Wait()
loki.Shutdown()
}

View file

@ -0,0 +1,107 @@
package loki
import (
"errors"
"strconv"
"time"
)
// If the client is disabled, it returns error.
// If the entries channel is full, the message is discarded and returns error
func (client *Client) Send(msg string, timestamp time.Time) error {
if !client.IsEnabled() {
client.missedMessages++
return errors.New("client disabled")
}
if client.addToEntries(timestamp, msg) {
if client.missedMessages > 0 {
client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed")
client.missedMessages = 0
}
} else {
client.missedMessages++
return errors.New("channel is full")
}
return nil
}
// addToEntries attempts to add a log entry to the entries channel.
// It returns true if the entry was added successfully, and false otherwise.
func (client *Client) addToEntries(timestamp time.Time, msg string) bool {
select {
case client.entries <- logEntry{
Ts: timestamp,
Line: msg,
}:
return true
default:
return false
}
}
// run manages the sending of log batches to Loki.
// It collects log entries into batches and sends them either when a batch is full,
// or when the maximum wait time has elapsed.
func (client *Client) run() {
if !client.IsEnabled() {
client.drainEntries()
return
}
client.waitGroup.Add(1)
defer client.waitGroup.Done()
var batch []logEntry
maxWait := time.NewTimer(client.config.BatchWait)
for {
select {
case <-client.quit:
for len(client.entries) > 0 {
entry := <-client.entries
batch = append(batch, entry)
}
batch = client.processMissedMessages(batch)
client.processBatch(batch)
return
case entry := <-client.entries:
batch = append(batch, entry)
if len(batch) >= client.config.BatchEntriesNumber {
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
case <-maxWait.C:
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
}
}
func (client *Client) drainEntries() {
for len(client.entries) > 0 {
<-client.entries
}
}
func (client *Client) processBatch(batch []logEntry) []logEntry {
if len(batch) > 0 {
client.sendLogs(batch)
batch = batch[:0]
}
return batch
}
func (client *Client) processMissedMessages(batch []logEntry) []logEntry {
if client.missedMessages > 0 {
batch = append(batch, logEntry{
Ts: time.Now(),
Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed",
})
client.missedMessages = 0
}
return batch
}

View file

@ -0,0 +1,67 @@
package loki
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
)
func (client *Client) sendLogs(entries []logEntry) {
if len(entries) == 0 {
return
}
var streams []stream
stream := stream{
Labels: client.config.Labels,
Entries: entries,
}
streams = append(streams, stream)
msg := promtailMsg{Streams: streams}
jsonMsg, err := json.Marshal(msg)
if err != nil {
return
}
client.mutex.RLock()
endpoint := client.config.Endpoint
client.mutex.RUnlock()
client.sendRequest("POST", endpoint, "application/json", jsonMsg)
}
func (client *Client) sendRequest(method, url string, ctype string, reqBody []byte) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
if err != nil {
return
}
req.Header.Set("Content-Type", ctype)
resp, _ := client.client.Do(req)
if resp != nil {
defer resp.Body.Close()
}
}
func (p *stream) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Labels string `json:"labels"`
Entries []logEntry `json:"entries"`
}{
Labels: p.Labels.String(),
Entries: p.Entries,
})
}
func (l labels) String() string {
var labelPairs []string
for key, value := range l {
labelPairs = append(labelPairs, fmt.Sprintf(`%s="%s"`, key, value))
}
return fmt.Sprintf("{%s}", strings.Join(labelPairs, ","))
}

View file

@ -0,0 +1,96 @@
package loki
import (
"net/http"
"regexp"
"sync"
"time"
)
const logEntriesChanSize = 5000
// Represents a single log entry.
type logEntry struct {
Ts time.Time `json:"ts"`
Line string `json:"line"`
}
type labels map[string]string
// Stream represents a stream of log entries with associated labels.
type stream struct {
Labels labels `json:"-"`
Entries []logEntry `json:"entries"`
}
type promtailMsg struct {
Streams []stream `json:"streams"`
}
// Client is a client for sending log entries.
type Client struct {
config Config
quit chan struct{}
entries chan logEntry
waitGroup sync.WaitGroup
client http.Client
missedMessages int64
mutex sync.RWMutex
}
type Config struct {
Enabled bool
// E.g. localhost:3100/api/prom/push.
Endpoint string
Labels map[string]string
//Maximum message buffering time.
BatchWait time.Duration
//Maximum number of messages in the queue.
BatchEntriesNumber int
}
// Setup initializes the client with the given configuration and starts the processing goroutine.
// It is the caller's responsibility to call Shutdown() to free resources.
func Setup(conf Config) *Client {
client := newClient()
client.config = conf
client.config.Endpoint = normalizeURL(client.config.Endpoint)
go client.run()
return client
}
// Shutdown stops the client and waits for all logs to be sent.
func (client *Client) Shutdown() {
client.mutex.Lock()
client.config.Enabled = false
client.mutex.Unlock()
close(client.quit)
client.waitGroup.Wait()
}
// IsEnabled checks whether the client is enabled.
func (client *Client) IsEnabled() bool {
client.mutex.RLock()
defer client.mutex.RUnlock()
return client.config.Enabled
}
func newClient() *Client {
return &Client{
quit: make(chan struct{}),
entries: make(chan logEntry, logEntriesChanSize),
client: http.Client{},
config: Config{Enabled: false},
mutex: sync.RWMutex{},
waitGroup: sync.WaitGroup{},
}
}
func normalizeURL(host string) string {
if !regexp.MustCompile(`^https?:\/\/`).MatchString(host) {
host = "http://" + host
}
return host
}