Add Loki log sending #5

Merged
fyrchik merged 1 commit from achuprov/frostfs-observability:loki into master 2023-11-02 07:44:24 +00:00
8 changed files with 428 additions and 4 deletions

8
go.mod
View file

@ -16,6 +16,11 @@ require (
google.golang.org/protobuf v1.30.0
)
require (
github.com/kr/text v0.2.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
@ -35,8 +40,9 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/zap v1.26.0
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

12
go.sum
View file

@ -55,6 +55,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -147,8 +148,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -203,6 +205,10 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -312,8 +318,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

67
logging/lokicore/core.go Normal file
View file

@ -0,0 +1,67 @@
package lokicore
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Zap Core for loki.
// Expands the zapcore.Core interface with calls to export logs to Loki.
type LokiCore struct {
original zapcore.Core
encoder zapcore.Encoder
loki *loki.Client
}
func New(original zapcore.Core, lokiCfg loki.Config) *LokiCore {

What is original core here? We can unite anything with 87577d85d5/zapcore/tee.go (L37C18-L37C18) , so 2 cores are possible

What is original core here? We can unite anything with https://github.com/uber-go/zap/blob/87577d85d58b6d92d0158967df29303d04d30e36/zapcore/tee.go#L37C18-L37C18 , so 2 cores are possible

This is any core that implements the Core interface. If you want to pass two cores, you can merge them using zapcore.NewTee. In this case, the lowest logging level of the two cores will be used.

This is any core that implements the `Core` interface. If you want to pass two cores, you can merge them using `zapcore.NewTee`. In this case, the lowest logging level of the two cores will be used.
encoderConfig := zap.NewProductionEncoderConfig()
encoder := zapcore.NewJSONEncoder(encoderConfig)
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(time.RFC3339Nano)
return &LokiCore{
original: original,
encoder: encoder,
loki: loki.Setup(lokiCfg),
}
}
func (c *LokiCore) With(fields []zapcore.Field) zapcore.Core {
return &LokiCore{
original: c.original.With(fields),
encoder: c.encoder,
loki: c.loki,
}
}
func (c *LokiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if err := c.original.Write(entry, fields); err != nil {
return err
}
buffer, err := c.encoder.EncodeEntry(entry, fields)
defer buffer.Free()
if err != nil {
return err
}
return c.loki.Send(buffer.String(), entry.Time)
}
func (c *LokiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(entry.Level) {
return checked.AddCore(entry, c)
}
return checked
}
func (c *LokiCore) Sync() error {
return c.original.Sync()
}
func (c *LokiCore) Enabled(level zapcore.Level) bool {
return c.original.Enabled(level)
}

View file

@ -0,0 +1,32 @@
# git.frostfs.info/TrueCloudLab/frostfs-observability/loki"
A simple asynchronous client in Go for sending logs to Loki.
## Usage
```go
package main
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
)
func main() {
loki := loki.Setup(loki.Config{
Address: "localhost:3100/api/prom/push",
Labels: map[string]string{
"label": "test",
},
BatchWait: 1000,
BatchEntriesNumber: 200,
Enabled: true,
})
defer loki.Shutdown()
loki.Send("log message", time.Now())
}
```

View file

@ -0,0 +1,43 @@
package main
import (
"strconv"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki"
)
var wg sync.WaitGroup
const countMsgGroup = 100
const countMsg = 500000
func send(loki *loki.Client) {
wg.Add(1)
defer wg.Done()
for j := 0; j < countMsg/countMsgGroup; j++ {
for i := 0; i < countMsgGroup; i++ {
loki.Send(strconv.Itoa(j)+" "+strconv.Itoa(i)+" test log message", time.Now())
}
time.Sleep(20 * time.Millisecond)
}
}
func main() {
loki := loki.Setup(loki.Config{
Endpoint: "localhost:3100/api/prom/push",
Labels: map[string]string{
"label": "test",
},
BatchWait: 1000,
BatchEntriesNumber: 200,
Enabled: true,
})
go send(loki)
send(loki)
wg.Wait()
loki.Shutdown()
}

View file

@ -0,0 +1,107 @@
package loki
import (
"errors"
"strconv"
"time"
)
fyrchik marked this conversation as resolved Outdated

It makes no sense to create a client which is disabled, right? Why are there IsEnabled checks?

It makes no sense to create a client which is disabled, right? Why are there `IsEnabled` checks?

fixed

fixed
// If the client is disabled, it returns error.
// If the entries channel is full, the message is discarded and returns error
func (client *Client) Send(msg string, timestamp time.Time) error {
if !client.IsEnabled() {
client.missedMessages++
return errors.New("client disabled")
}
if client.addToEntries(timestamp, msg) {
if client.missedMessages > 0 {
client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed")
client.missedMessages = 0
}
} else {
client.missedMessages++
return errors.New("channel is full")
}
return nil
}
// addToEntries attempts to add a log entry to the entries channel.
// It returns true if the entry was added successfully, and false otherwise.
func (client *Client) addToEntries(timestamp time.Time, msg string) bool {
select {
case client.entries <- logEntry{
Ts: timestamp,
Line: msg,
}:
return true
default:
return false
}
}
// run manages the sending of log batches to Loki.
// It collects log entries into batches and sends them either when a batch is full,
// or when the maximum wait time has elapsed.
func (client *Client) run() {
if !client.IsEnabled() {
client.drainEntries()
return
}
client.waitGroup.Add(1)
defer client.waitGroup.Done()
var batch []logEntry
maxWait := time.NewTimer(client.config.BatchWait)
for {
select {
case <-client.quit:
for len(client.entries) > 0 {
entry := <-client.entries
batch = append(batch, entry)
}
batch = client.processMissedMessages(batch)
client.processBatch(batch)
return
case entry := <-client.entries:
batch = append(batch, entry)
if len(batch) >= client.config.BatchEntriesNumber {
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
case <-maxWait.C:
batch = client.processBatch(batch)
maxWait.Reset(client.config.BatchWait)
}
}
}
func (client *Client) drainEntries() {
for len(client.entries) > 0 {
<-client.entries
}
}
func (client *Client) processBatch(batch []logEntry) []logEntry {
if len(batch) > 0 {
client.sendLogs(batch)
batch = batch[:0]
}
return batch
}
func (client *Client) processMissedMessages(batch []logEntry) []logEntry {
if client.missedMessages > 0 {
batch = append(batch, logEntry{
Ts: time.Now(),
Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed",
})
client.missedMessages = 0
}
return batch
}

View file

@ -0,0 +1,67 @@
package loki
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
)
func (client *Client) sendLogs(entries []logEntry) {
if len(entries) == 0 {
return
}
var streams []stream
stream := stream{
Labels: client.config.Labels,
Entries: entries,
}
streams = append(streams, stream)
msg := promtailMsg{Streams: streams}
jsonMsg, err := json.Marshal(msg)
if err != nil {
return
}
client.mutex.RLock()
endpoint := client.config.Endpoint
client.mutex.RUnlock()
client.sendRequest("POST", endpoint, "application/json", jsonMsg)
}
func (client *Client) sendRequest(method, url string, ctype string, reqBody []byte) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
if err != nil {
return
}
req.Header.Set("Content-Type", ctype)
resp, _ := client.client.Do(req)
if resp != nil {
defer resp.Body.Close()
}
}
func (p *stream) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Labels string `json:"labels"`
Entries []logEntry `json:"entries"`
}{
Labels: p.Labels.String(),
Entries: p.Entries,
})
}
func (l labels) String() string {
var labelPairs []string
for key, value := range l {
labelPairs = append(labelPairs, fmt.Sprintf(`%s="%s"`, key, value))
}
return fmt.Sprintf("{%s}", strings.Join(labelPairs, ","))
}

View file

@ -0,0 +1,96 @@
package loki
import (
"net/http"
"regexp"
"sync"
"time"
)
const logEntriesChanSize = 5000
// Represents a single log entry.
type logEntry struct {
Ts time.Time `json:"ts"`
Line string `json:"line"`
}
type labels map[string]string
// Stream represents a stream of log entries with associated labels.
type stream struct {
Labels labels `json:"-"`
Entries []logEntry `json:"entries"`
}
type promtailMsg struct {
Streams []stream `json:"streams"`
}
// Client is a client for sending log entries.
type Client struct {
config Config
quit chan struct{}
entries chan logEntry
waitGroup sync.WaitGroup
client http.Client
missedMessages int64
mutex sync.RWMutex
}
type Config struct {
Enabled bool
// E.g. localhost:3100/api/prom/push.
Endpoint string
Labels map[string]string
//Maximum message buffering time.

Space after the //.

Space after the `//`.
BatchWait time.Duration
//Maximum number of messages in the queue.
BatchEntriesNumber int
}
// Setup initializes the client with the given configuration and starts the processing goroutine.
// It is the caller's responsibility to call Shutdown() to free resources.
func Setup(conf Config) *Client {
client := newClient()
client.config = conf
client.config.Endpoint = normalizeURL(client.config.Endpoint)
go client.run()
return client
}
// Shutdown stops the client and waits for all logs to be sent.
func (client *Client) Shutdown() {
client.mutex.Lock()
client.config.Enabled = false
client.mutex.Unlock()
close(client.quit)
client.waitGroup.Wait()
}
// IsEnabled checks whether the client is enabled.
func (client *Client) IsEnabled() bool {
client.mutex.RLock()
defer client.mutex.RUnlock()
return client.config.Enabled
}
func newClient() *Client {
return &Client{
quit: make(chan struct{}),
entries: make(chan logEntry, logEntriesChanSize),
client: http.Client{},
config: Config{Enabled: false},
mutex: sync.RWMutex{},
waitGroup: sync.WaitGroup{},
}
}
func normalizeURL(host string) string {
if !regexp.MustCompile(`^https?:\/\/`).MatchString(host) {
host = "http://" + host
}
return host
}