diff --git a/go.mod b/go.mod index 2bf4cb8..24a966d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,11 @@ require ( google.golang.org/protobuf v1.30.0 ) +require ( + github.com/kr/text v0.2.0 // indirect + go.uber.org/multierr v1.10.0 // indirect +) + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -35,8 +40,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect + go.uber.org/zap v1.26.0 golang.org/x/net v0.8.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.8.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 97ac088..80ab4e2 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -147,8 +148,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -203,6 +205,10 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -312,8 +318,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/logging/lokicore/core.go b/logging/lokicore/core.go new file mode 100644 index 0000000..50000c6 --- /dev/null +++ b/logging/lokicore/core.go @@ -0,0 +1,67 @@ +package lokicore + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Zap Core for loki. +// Expands the zapcore.Core interface with calls to export logs to Loki. +type LokiCore struct { + original zapcore.Core + encoder zapcore.Encoder + loki *loki.Client +} + +func New(original zapcore.Core, lokiCfg loki.Config) *LokiCore { + encoderConfig := zap.NewProductionEncoderConfig() + encoder := zapcore.NewJSONEncoder(encoderConfig) + encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(time.RFC3339Nano) + + return &LokiCore{ + original: original, + encoder: encoder, + loki: loki.Setup(lokiCfg), + } +} + +func (c *LokiCore) With(fields []zapcore.Field) zapcore.Core { + return &LokiCore{ + original: c.original.With(fields), + encoder: c.encoder, + loki: c.loki, + } +} + +func (c *LokiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + if err := c.original.Write(entry, fields); err != nil { + return err + } + + buffer, err := c.encoder.EncodeEntry(entry, fields) + defer buffer.Free() + + if err != nil { + return err + } + + return c.loki.Send(buffer.String(), entry.Time) +} + +func (c *LokiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(entry.Level) { + return checked.AddCore(entry, c) + } + return checked +} + +func (c *LokiCore) Sync() error { + return c.original.Sync() +} + +func (c *LokiCore) Enabled(level zapcore.Level) bool { + return c.original.Enabled(level) +} diff --git a/logging/lokicore/loki/README.md b/logging/lokicore/loki/README.md new file mode 100644 index 0000000..e130b46 --- /dev/null +++ b/logging/lokicore/loki/README.md @@ -0,0 +1,32 @@ +# git.frostfs.info/TrueCloudLab/frostfs-observability/loki" + +A simple asynchronous client in Go for sending logs to Loki. + +## Usage + +```go +package main + +import ( + + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki" +) + +func main() { + loki := loki.Setup(loki.Config{ + Address: "localhost:3100/api/prom/push", + Labels: map[string]string{ + "label": "test", + }, + BatchWait: 1000, + BatchEntriesNumber: 200, + Enabled: true, + }) + defer loki.Shutdown() + + loki.Send("log message", time.Now()) +} + +``` \ No newline at end of file diff --git a/logging/lokicore/loki/example/main.go b/logging/lokicore/loki/example/main.go new file mode 100644 index 0000000..ce671cc --- /dev/null +++ b/logging/lokicore/loki/example/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "strconv" + "sync" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore/loki" +) + +var wg sync.WaitGroup + +const countMsgGroup = 100 +const countMsg = 500000 + +func send(loki *loki.Client) { + wg.Add(1) + defer wg.Done() + + for j := 0; j < countMsg/countMsgGroup; j++ { + for i := 0; i < countMsgGroup; i++ { + loki.Send(strconv.Itoa(j)+" "+strconv.Itoa(i)+" test log message", time.Now()) + } + time.Sleep(20 * time.Millisecond) + } +} + +func main() { + loki := loki.Setup(loki.Config{ + Endpoint: "localhost:3100/api/prom/push", + Labels: map[string]string{ + "label": "test", + }, + BatchWait: 1000, + BatchEntriesNumber: 200, + Enabled: true, + }) + go send(loki) + send(loki) + + wg.Wait() + loki.Shutdown() +} diff --git a/logging/lokicore/loki/log.go b/logging/lokicore/loki/log.go new file mode 100644 index 0000000..9cafd53 --- /dev/null +++ b/logging/lokicore/loki/log.go @@ -0,0 +1,107 @@ +package loki + +import ( + "errors" + "strconv" + "time" +) + +// If the client is disabled, it returns error. +// If the entries channel is full, the message is discarded and returns error +func (client *Client) Send(msg string, timestamp time.Time) error { + if !client.IsEnabled() { + client.missedMessages++ + return errors.New("client disabled") + } + + if client.addToEntries(timestamp, msg) { + if client.missedMessages > 0 { + client.addToEntries(time.Now(), strconv.FormatInt(client.missedMessages, 10)+" messages missed") + client.missedMessages = 0 + } + } else { + client.missedMessages++ + return errors.New("channel is full") + } + + return nil +} + +// addToEntries attempts to add a log entry to the entries channel. +// It returns true if the entry was added successfully, and false otherwise. +func (client *Client) addToEntries(timestamp time.Time, msg string) bool { + select { + case client.entries <- logEntry{ + Ts: timestamp, + Line: msg, + }: + return true + default: + return false + } +} + +// run manages the sending of log batches to Loki. +// It collects log entries into batches and sends them either when a batch is full, +// or when the maximum wait time has elapsed. +func (client *Client) run() { + if !client.IsEnabled() { + client.drainEntries() + return + } + + client.waitGroup.Add(1) + defer client.waitGroup.Done() + + var batch []logEntry + maxWait := time.NewTimer(client.config.BatchWait) + + for { + select { + case <-client.quit: + for len(client.entries) > 0 { + entry := <-client.entries + batch = append(batch, entry) + } + batch = client.processMissedMessages(batch) + client.processBatch(batch) + return + + case entry := <-client.entries: + batch = append(batch, entry) + if len(batch) >= client.config.BatchEntriesNumber { + batch = client.processBatch(batch) + maxWait.Reset(client.config.BatchWait) + } + + case <-maxWait.C: + batch = client.processBatch(batch) + maxWait.Reset(client.config.BatchWait) + } + } +} + +func (client *Client) drainEntries() { + for len(client.entries) > 0 { + <-client.entries + } +} + +func (client *Client) processBatch(batch []logEntry) []logEntry { + if len(batch) > 0 { + client.sendLogs(batch) + batch = batch[:0] + } + return batch +} + +func (client *Client) processMissedMessages(batch []logEntry) []logEntry { + if client.missedMessages > 0 { + batch = append(batch, logEntry{ + Ts: time.Now(), + Line: strconv.FormatInt(client.missedMessages, 10) + " messages missed", + }) + client.missedMessages = 0 + } + return batch +} diff --git a/logging/lokicore/loki/send.go b/logging/lokicore/loki/send.go new file mode 100644 index 0000000..de0e09c --- /dev/null +++ b/logging/lokicore/loki/send.go @@ -0,0 +1,67 @@ +package loki + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" +) + +func (client *Client) sendLogs(entries []logEntry) { + if len(entries) == 0 { + return + } + + var streams []stream + stream := stream{ + Labels: client.config.Labels, + Entries: entries, + } + streams = append(streams, stream) + + msg := promtailMsg{Streams: streams} + jsonMsg, err := json.Marshal(msg) + if err != nil { + return + } + + client.mutex.RLock() + endpoint := client.config.Endpoint + client.mutex.RUnlock() + + client.sendRequest("POST", endpoint, "application/json", jsonMsg) +} + +func (client *Client) sendRequest(method, url string, ctype string, reqBody []byte) { + req, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody)) + + if err != nil { + return + } + + req.Header.Set("Content-Type", ctype) + + resp, _ := client.client.Do(req) + if resp != nil { + defer resp.Body.Close() + } +} + +func (p *stream) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + Labels string `json:"labels"` + Entries []logEntry `json:"entries"` + }{ + Labels: p.Labels.String(), + Entries: p.Entries, + }) +} + +func (l labels) String() string { + var labelPairs []string + for key, value := range l { + labelPairs = append(labelPairs, fmt.Sprintf(`%s="%s"`, key, value)) + } + return fmt.Sprintf("{%s}", strings.Join(labelPairs, ",")) +} diff --git a/logging/lokicore/loki/setup.go b/logging/lokicore/loki/setup.go new file mode 100644 index 0000000..5c7c77d --- /dev/null +++ b/logging/lokicore/loki/setup.go @@ -0,0 +1,96 @@ +package loki + +import ( + "net/http" + "regexp" + "sync" + "time" +) + +const logEntriesChanSize = 5000 + +// Represents a single log entry. +type logEntry struct { + Ts time.Time `json:"ts"` + Line string `json:"line"` +} + +type labels map[string]string + +// Stream represents a stream of log entries with associated labels. +type stream struct { + Labels labels `json:"-"` + Entries []logEntry `json:"entries"` +} + +type promtailMsg struct { + Streams []stream `json:"streams"` +} + +// Client is a client for sending log entries. +type Client struct { + config Config + quit chan struct{} + entries chan logEntry + waitGroup sync.WaitGroup + client http.Client + missedMessages int64 + mutex sync.RWMutex +} + +type Config struct { + Enabled bool + // E.g. localhost:3100/api/prom/push. + Endpoint string + Labels map[string]string + //Maximum message buffering time. + BatchWait time.Duration + //Maximum number of messages in the queue. + BatchEntriesNumber int +} + +// Setup initializes the client with the given configuration and starts the processing goroutine. +// It is the caller's responsibility to call Shutdown() to free resources. +func Setup(conf Config) *Client { + client := newClient() + client.config = conf + client.config.Endpoint = normalizeURL(client.config.Endpoint) + go client.run() + return client +} + +// Shutdown stops the client and waits for all logs to be sent. +func (client *Client) Shutdown() { + client.mutex.Lock() + client.config.Enabled = false + client.mutex.Unlock() + + close(client.quit) + client.waitGroup.Wait() +} + +// IsEnabled checks whether the client is enabled. +func (client *Client) IsEnabled() bool { + client.mutex.RLock() + defer client.mutex.RUnlock() + return client.config.Enabled +} + +func newClient() *Client { + return &Client{ + quit: make(chan struct{}), + entries: make(chan logEntry, logEntriesChanSize), + client: http.Client{}, + config: Config{Enabled: false}, + mutex: sync.RWMutex{}, + waitGroup: sync.WaitGroup{}, + } +} + +func normalizeURL(host string) string { + if !regexp.MustCompile(`^https?:\/\/`).MatchString(host) { + host = "http://" + host + } + + return host +}