Optimize encoder #12

Merged
fyrchik merged 7 commits from fyrchik/zapjournald:optimize-encoder into master 2024-09-04 19:51:22 +00:00
4 changed files with 201 additions and 42 deletions

65
benchmark_test.go Normal file
View file

@ -0,0 +1,65 @@
package zapjournald
import (
"testing"
"github.com/ssgreg/journald"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type nopSync struct{}
func (nopSync) Write([]byte) (int, error) { return 0, nil }
func (nopSync) Sync() error { return nil }
func BenchmarkLogger(b *testing.B) {
zc := zap.NewProductionConfig()
zc.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
zc.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
b.Run("standard", func(b *testing.B) {
encoder := zapcore.NewJSONEncoder(zc.EncoderConfig)
core := zapcore.NewCore(encoder, nopSync{}, zc.Level)
coreWithContext := core.With([]zapcore.Field{
SyslogFacility(LogDaemon),
SyslogIdentifier(),
SyslogPid()})
l := zap.New(coreWithContext)
benchmarkLog(b, l)
})
b.Run("journald", func(b *testing.B) {
encoder := zapcore.NewJSONEncoder(zc.EncoderConfig)
core := NewCore(zc.Level, encoder, &journald.Journal{}, SyslogFields)
core.j.TestModeEnabled = true // Disable actual writing to the journal.
coreWithContext := core.With([]zapcore.Field{
SyslogFacility(LogDaemon),
SyslogIdentifier(),
SyslogPid(),
})
l := zap.New(coreWithContext)
benchmarkLog(b, l)
})
}
func benchmarkLog(b *testing.B, l *zap.Logger) {
b.Run("no fields", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
l.Info("Simple log message")
}
})
b.Run("application fields", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
l.Info("Simple log message", zap.Uint32("count", 123), zap.String("details", "nothing"))
}
})
b.Run("journald fields", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
l.Info("Simple log message", SyslogIdentifier())
}
})
}

70
encoder.go Normal file
View file

@ -0,0 +1,70 @@
package zapjournald
import (
"bytes"
"fmt"
"strings"
"go.uber.org/zap/buffer"
)
var pool = buffer.NewPool()
func encodeJournaldField(buf *buffer.Buffer, key string, value any) {
switch v := value.(type) {
case string:
writeField(buf, key, v)
case []byte:
writeFieldBytes(buf, key, v)
default:
writeField(buf, key, fmt.Sprint(v))
}
}
func writeFieldBytes(buf *buffer.Buffer, name string, value []byte) {
buf.Write([]byte(name))
if bytes.ContainsRune(value, '\n') {
// According to the format, if the value includes a newline
// need to write the field name, plus a newline, then the
// size (64bit LE), the field data and a final newline.
buf.Write([]byte{'\n'})
appendUint64Binary(buf, uint64(len(value)))
} else {
buf.Write([]byte{'='})
}
fyrchik marked this conversation as resolved Outdated

acidentally -> accidentally

acidentally -> accidentally

removed this

removed this
buf.Write(value)
buf.Write([]byte{'\n'})
}
func writeField(buf *buffer.Buffer, name string, value string) {
buf.Write([]byte(name))
if strings.ContainsRune(value, '\n') {
// According to the format, if the value includes a newline
// need to write the field name, plus a newline, then the
// size (64bit LE), the field data and a final newline.
buf.Write([]byte{'\n'})
// 1 allocation here.
fyrchik marked this conversation as resolved Outdated

Please explain why only the encoder is cloned? Why aren't the other fields (fields and buf) cloned?

Please explain why only the encoder is cloned? Why aren't the other fields (fields and buf) cloned?

Because I wrote this like before adding buffers, will fix

Because I wrote this like before adding buffers, will fix

removed this

removed this
// binary.Write(w, binary.LittleEndian, uint64(len(value)))
appendUint64Binary(buf, uint64(len(value)))
} else {
buf.Write([]byte{'='})
}
buf.WriteString(value)
buf.Write([]byte{'\n'})
}
func appendUint64Binary(buf *buffer.Buffer, v uint64) {
// Copied from https://github.com/golang/go/blob/go1.21.3/src/encoding/binary/binary.go#L119
buf.Write([]byte{
byte(v),
byte(v >> 8),
byte(v >> 16),
byte(v >> 24),
byte(v >> 32),
byte(v >> 40),
byte(v >> 48),
byte(v >> 56),
})
}

View file

@ -2,6 +2,9 @@ package zapjournald
import ( import (
"fmt" "fmt"
"math"
"strconv"
"time"
"github.com/ssgreg/journald" "github.com/ssgreg/journald"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -65,47 +68,38 @@ func (core *Core) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zap
// If called, Write should always log the Entry and Fields; it should not // If called, Write should always log the Entry and Fields; it should not
// replicate the logic of Check. // replicate the logic of Check.
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
prio, err := zapLevelToJournald(entry.Level)
if err != nil {
return err
}
b := pool.Get()
defer b.Free()
writeField(b, "PRIORITY", strconv.Itoa(int(prio)))
if len(core.contextStructuredFields) != 0 {
for k, v := range core.contextStructuredFields {
encodeJournaldField(b, k, v)
}
for _, field := range fields {
if _, isJournalField := core.storedFieldNames[field.Key]; isJournalField {
encodeJournaldField(b, field.Key, getFieldValue(field))
}
}
}
// Generate the message. // Generate the message.
buffer, err := core.encoder.EncodeEntry(entry, fields) buffer, err := core.encoder.EncodeEntry(entry, fields)
if err != nil { if err != nil {
return fmt.Errorf("failed to encode log entry: %w", err) return fmt.Errorf("failed to encode log entry: %w", err)
} }
defer buffer.Free()
message := buffer.String() writeFieldBytes(b, "MESSAGE", buffer.Bytes())
structuredFields := maps.Clone(core.contextStructuredFields)
for _, field := range fields {
if _, isJournalField := core.storedFieldNames[field.Key]; isJournalField {
structuredFields[field.Key] = getFieldValue(field)
}
}
// Write the message. // Write the message.
switch entry.Level { return core.j.WriteMsg(b.Bytes())
case zapcore.DebugLevel:
return core.j.Send(message, journald.PriorityDebug, structuredFields)
case zapcore.InfoLevel:
return core.j.Send(message, journald.PriorityInfo, structuredFields)
case zapcore.WarnLevel:
return core.j.Send(message, journald.PriorityWarning, structuredFields)
case zapcore.ErrorLevel:
return core.j.Send(message, journald.PriorityErr, structuredFields)
case zapcore.DPanicLevel:
return core.j.Send(message, journald.PriorityCrit, structuredFields)
case zapcore.PanicLevel:
return core.j.Send(message, journald.PriorityCrit, structuredFields)
case zapcore.FatalLevel:
return core.j.Send(message, journald.PriorityCrit, structuredFields)
default:
return fmt.Errorf("unknown log level: %v", entry.Level)
}
} }
// Sync flushes buffered logs (not used). // Sync flushes buffered logs (not used).
@ -141,20 +135,29 @@ func getFieldValue(f zapcore.Field) interface{} {
zapcore.ErrorType, zapcore.ErrorType,
zapcore.SkipType: zapcore.SkipType:
return f.Interface return f.Interface
case zapcore.DurationType, case zapcore.DurationType:
zapcore.Float64Type, return time.Duration(f.Integer).String()
zapcore.Float32Type, case zapcore.Float64Type:
zapcore.Int64Type, // See https://github.com/uber-go/zap/blob/v1.26.0/buffer/buffer.go#L79
f := math.Float64frombits(uint64(f.Integer))
return strconv.FormatFloat(f, 'f', -1, 64)
case zapcore.Float32Type:
f := math.Float32frombits(uint32(f.Integer))
return strconv.FormatFloat(float64(f), 'f', -1, 32)
case zapcore.Int64Type,
zapcore.Int32Type, zapcore.Int32Type,
zapcore.Int16Type, zapcore.Int16Type,
zapcore.Int8Type, zapcore.Int8Type:
return strconv.FormatInt(f.Integer, 10)
case
zapcore.Uint64Type, zapcore.Uint64Type,
zapcore.Uint32Type, zapcore.Uint32Type,
zapcore.Uint16Type, zapcore.Uint16Type,
zapcore.Uint8Type, zapcore.Uint8Type,
zapcore.UintptrType, zapcore.UintptrType:
zapcore.BoolType: return strconv.FormatUint(uint64(f.Integer), 10)
return f.Integer case zapcore.BoolType:
return strconv.FormatBool(f.Integer == 1)
case zapcore.StringType: case zapcore.StringType:
return f.String return f.String
case zapcore.TimeType: case zapcore.TimeType:
@ -162,8 +165,29 @@ func getFieldValue(f zapcore.Field) interface{} {
// for example: zap.Time("k", time.Unix(100900, 0).In(time.UTC)) - will produce: "100900000000000 UTC" (result in nanoseconds) // for example: zap.Time("k", time.Unix(100900, 0).In(time.UTC)) - will produce: "100900000000000 UTC" (result in nanoseconds)
return fmt.Sprintf("%d %v", f.Integer, f.Interface) return fmt.Sprintf("%d %v", f.Integer, f.Interface)
} }
return f.Integer return strconv.FormatUint(uint64(f.Integer), 10)
default: default:
panic(fmt.Sprintf("unknown field type: %v", f)) panic(fmt.Sprintf("unknown field type: %v", f))
} }
} }
func zapLevelToJournald(l zapcore.Level) (journald.Priority, error) {
switch l {
case zapcore.DebugLevel:
return journald.PriorityDebug, nil
case zapcore.InfoLevel:
return journald.PriorityInfo, nil
case zapcore.WarnLevel:
return journald.PriorityWarning, nil
case zapcore.ErrorLevel:
return journald.PriorityErr, nil
case zapcore.DPanicLevel:
return journald.PriorityCrit, nil
case zapcore.PanicLevel:
return journald.PriorityCrit, nil
case zapcore.FatalLevel:
return journald.PriorityCrit, nil
default:
return 0, fmt.Errorf("unknown log level: %v", l)
}
}