forked from TrueCloudLab/frostfs-s3-gw
[#369] Enhanced http requests logging
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
(cherry picked from commit 575ab4d294
)
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
377fa127b5
commit
14f39dfb1e
19 changed files with 639 additions and 13 deletions
|
@ -3,11 +3,12 @@ FROM golang:1.21 as builder
|
|||
ARG BUILD=now
|
||||
ARG REPO=git.frostfs.info/TrueCloudLab/frostfs-s3-gw
|
||||
ARG VERSION=dev
|
||||
ARG GOFLAGS=""
|
||||
|
||||
WORKDIR /src
|
||||
COPY . /src
|
||||
|
||||
RUN make
|
||||
RUN make GOFLAGS=${GOFLAGS}
|
||||
|
||||
# Executable image
|
||||
FROM alpine AS frostfs-s3-gw
|
||||
|
|
6
Makefile
6
Makefile
|
@ -14,6 +14,8 @@ METRICS_DUMP_OUT ?= ./metrics-dump.json
|
|||
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
|
||||
BINS = $(addprefix $(BINDIR)/, $(CMDS))
|
||||
|
||||
GOFLAGS ?=
|
||||
|
||||
# Variables for docker
|
||||
REPO_BASENAME = $(shell basename `go list -m`)
|
||||
HUB_IMAGE ?= "truecloudlab/$(REPO_BASENAME)"
|
||||
|
@ -38,6 +40,7 @@ all: $(BINS)
|
|||
$(BINS): $(BINDIR) dep
|
||||
@echo "⇒ Build $@"
|
||||
CGO_ENABLED=0 \
|
||||
GOFLAGS=$(GOFLAGS) \
|
||||
go build -v -trimpath \
|
||||
-ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \
|
||||
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
|
||||
|
@ -64,7 +67,7 @@ docker/%:
|
|||
-w /src \
|
||||
-u `stat -c "%u:%g" .` \
|
||||
--env HOME=/src \
|
||||
golang:$(GO_VERSION) make $*,\
|
||||
golang:$(GO_VERSION) make GOFLAGS=$(GOFLAGS) $*,\
|
||||
@echo "supported docker targets: all $(BINS) lint")
|
||||
|
||||
# Run tests
|
||||
|
@ -87,6 +90,7 @@ image:
|
|||
@docker build \
|
||||
--build-arg REPO=$(REPO) \
|
||||
--build-arg VERSION=$(VERSION) \
|
||||
--build-arg GOFLAGS=$(GOFLAGS) \
|
||||
--rm \
|
||||
-f .docker/Dockerfile \
|
||||
-t $(HUB_IMAGE):$(HUB_TAG) .
|
||||
|
|
|
@ -22,8 +22,8 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
)
|
||||
|
||||
// authorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
|
||||
var authorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`)
|
||||
// AuthorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
|
||||
var AuthorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`)
|
||||
|
||||
// postPolicyCredentialRegexp -- is regexp for credentials when uploading file using POST with policy.
|
||||
var postPolicyCredentialRegexp = regexp.MustCompile(`(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request`)
|
||||
|
@ -85,7 +85,7 @@ var ContentSHA256HeaderStandardValue = map[string]struct{}{
|
|||
func New(creds tokens.Credentials, prefixes []string) *Center {
|
||||
return &Center{
|
||||
cli: creds,
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
|
||||
allowedAccessKeyIDPrefixes: prefixes,
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ func TestAuthHeaderParse(t *testing.T) {
|
|||
defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f"
|
||||
|
||||
center := &Center{
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
|
|
|
@ -84,7 +84,7 @@ func TestCheckSign(t *testing.T) {
|
|||
|
||||
c := &Center{
|
||||
cli: mock,
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
|
||||
}
|
||||
box, err := c.Authenticate(req)
|
||||
|
|
|
@ -103,3 +103,9 @@ const (
|
|||
PartNumberQuery = "partNumber"
|
||||
LegalHoldQuery = "legal-hold"
|
||||
)
|
||||
|
||||
const (
|
||||
StdoutPath = "stdout"
|
||||
StderrPath = "stderr"
|
||||
SinkName = "lumberjack"
|
||||
)
|
||||
|
|
245
api/middleware/log_http.go
Normal file
245
api/middleware/log_http.go
Normal file
|
@ -0,0 +1,245 @@
|
|||
//go:build loghttp
|
||||
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
type (
|
||||
LogHTTPSettings interface {
|
||||
LogHTTPConfig() LogHTTPConfig
|
||||
}
|
||||
LogHTTPConfig struct {
|
||||
Enabled bool
|
||||
MaxBody int64
|
||||
MaxLogSize int
|
||||
OutputPath string
|
||||
UseGzip bool
|
||||
log *httpLogger
|
||||
}
|
||||
httpLogger struct {
|
||||
*zap.Logger
|
||||
logRoller *lumberjack.Logger
|
||||
}
|
||||
// Implementation of zap.Sink for using lumberjack.
|
||||
lumberjackSink struct {
|
||||
*lumberjack.Logger
|
||||
}
|
||||
// responseReadWriter helps read http response body.
|
||||
responseReadWriter struct {
|
||||
http.ResponseWriter
|
||||
response *bytes.Buffer
|
||||
statusCode int
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
payloadLabel = "payload"
|
||||
responseLabel = "response"
|
||||
)
|
||||
|
||||
func (lumberjackSink) Sync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lc *LogHTTPConfig) InitHTTPLogger(log *zap.Logger) {
|
||||
if err := lc.initHTTPLogger(); err != nil {
|
||||
log.Error(logs.FailedToInitializeHTTPLogger, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// initHTTPLogger returns registers zap sink and returns new httpLogger.
|
||||
func (lc *LogHTTPConfig) initHTTPLogger() (err error) {
|
||||
lc.log = &httpLogger{
|
||||
Logger: zap.NewNop(),
|
||||
logRoller: &lumberjack.Logger{},
|
||||
}
|
||||
c := newLoggerConfig()
|
||||
lc.log.Logger, err = c.Build()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lc.setLogOutput()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newLoggerConfig creates new zap.Config with disabled base fields.
|
||||
func newLoggerConfig() zap.Config {
|
||||
c := zap.NewProductionConfig()
|
||||
c.DisableCaller = true
|
||||
c.DisableStacktrace = true
|
||||
c.EncoderConfig = newEncoderConfig()
|
||||
c.Sampling = nil
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (lc *LogHTTPConfig) setLogOutput() {
|
||||
var output zapcore.WriteSyncer
|
||||
switch lc.OutputPath {
|
||||
case "", StdoutPath:
|
||||
output = zapcore.AddSync(os.Stdout)
|
||||
case StderrPath:
|
||||
output = zapcore.AddSync(os.Stderr)
|
||||
default:
|
||||
output = zapcore.AddSync(&lumberjack.Logger{
|
||||
Filename: lc.OutputPath,
|
||||
MaxSize: lc.MaxLogSize,
|
||||
Compress: lc.UseGzip,
|
||||
})
|
||||
}
|
||||
|
||||
// create logger with new sync
|
||||
lc.log.Logger = lc.log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||
return zapcore.NewCore(zapcore.NewJSONEncoder(newEncoderConfig()), output, zapcore.InfoLevel)
|
||||
}))
|
||||
}
|
||||
|
||||
func newEncoderConfig() zapcore.EncoderConfig {
|
||||
c := zap.NewProductionEncoderConfig()
|
||||
c.MessageKey = zapcore.OmitKey
|
||||
c.LevelKey = zapcore.OmitKey
|
||||
c.TimeKey = zapcore.OmitKey
|
||||
c.FunctionKey = zapcore.OmitKey
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Write(data []byte) (int, error) {
|
||||
ww.response.Write(data)
|
||||
return ww.ResponseWriter.Write(data)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) WriteHeader(code int) {
|
||||
ww.statusCode = code
|
||||
ww.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Flush() {
|
||||
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// LogHTTP logs http parameters from s3 request.
|
||||
func LogHTTP(l *zap.Logger, settings LogHTTPSettings) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
config := settings.LogHTTPConfig()
|
||||
if !config.Enabled || config.log == nil {
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
httplog := config.log.getHTTPLogger(r).
|
||||
withFieldIfExist("query", r.URL.Query()).
|
||||
withFieldIfExist("headers", r.Header)
|
||||
|
||||
payload := getBody(r.Body, l)
|
||||
r.Body = io.NopCloser(bytes.NewReader(payload))
|
||||
|
||||
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l)
|
||||
|
||||
wr := newResponseReadWriter(w)
|
||||
h.ServeHTTP(wr, r)
|
||||
|
||||
respReader := io.LimitReader(wr.response, config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(responseLabel, respReader, l)
|
||||
httplog = httplog.with(zap.Int("status", wr.statusCode))
|
||||
|
||||
httplog.Info(logs.LogHTTP)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// withFieldIfExist checks whether data is not empty and attach it to log output.
|
||||
func (lg *httpLogger) withFieldIfExist(label string, data map[string][]string) *httpLogger {
|
||||
if len(data) != 0 {
|
||||
return lg.with(zap.Any(label, data))
|
||||
}
|
||||
return lg
|
||||
}
|
||||
|
||||
func (lg *httpLogger) with(fields ...zap.Field) *httpLogger {
|
||||
return &httpLogger{
|
||||
Logger: lg.Logger.With(fields...),
|
||||
logRoller: lg.logRoller,
|
||||
}
|
||||
}
|
||||
|
||||
func (lg *httpLogger) getHTTPLogger(r *http.Request) *httpLogger {
|
||||
return lg.with(
|
||||
zap.String("from", r.RemoteAddr),
|
||||
zap.String("URI", r.RequestURI),
|
||||
zap.String("method", r.Method),
|
||||
zap.String("protocol", r.Proto),
|
||||
)
|
||||
}
|
||||
|
||||
func (lg *httpLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *httpLogger {
|
||||
resp, err := processBody(bodyReader)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToProcessHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return lg
|
||||
}
|
||||
|
||||
return lg.with(zap.ByteString(label, resp))
|
||||
}
|
||||
|
||||
func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter {
|
||||
return &responseReadWriter{
|
||||
ResponseWriter: w,
|
||||
response: &bytes.Buffer{},
|
||||
}
|
||||
}
|
||||
|
||||
func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte {
|
||||
defer func(httpBody io.ReadCloser) {
|
||||
if err := httpBody.Close(); err != nil {
|
||||
l.Error(logs.FailedToCloseHTTPBody, zap.Error(err))
|
||||
}
|
||||
}(httpBody)
|
||||
|
||||
body, err := io.ReadAll(httpBody)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToReadHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return nil
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
// processBody reads body and base64 encode it if it's not XML.
|
||||
func processBody(bodyReader io.Reader) ([]byte, error) {
|
||||
resultBody := &bytes.Buffer{}
|
||||
detect := detector.NewDetector(bodyReader, xmlutils.DetectXML)
|
||||
dataType, err := detect.Detect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := xmlutils.ChooseWriter(dataType, resultBody)
|
||||
if _, err = io.Copy(writer, detect.RestoredReader()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resultBody.Bytes(), nil
|
||||
}
|
36
api/middleware/log_http_stub.go
Normal file
36
api/middleware/log_http_stub.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
//go:build !loghttp
|
||||
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
LogHTTPSettings interface {
|
||||
LogHTTPConfig() LogHTTPConfig
|
||||
}
|
||||
LogHTTPConfig struct {
|
||||
Enabled bool
|
||||
MaxBody int64
|
||||
MaxLogSize int
|
||||
OutputPath string
|
||||
UseGzip bool
|
||||
}
|
||||
)
|
||||
|
||||
func LogHTTP(l *zap.Logger, _ LogHTTPSettings) Func {
|
||||
l.Warn(logs.LogHTTPDisabledInThisBuild)
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (*LogHTTPConfig) InitHTTPLogger(*zap.Logger) {
|
||||
// ignore
|
||||
}
|
|
@ -96,6 +96,7 @@ type Settings interface {
|
|||
s3middleware.RequestSettings
|
||||
s3middleware.PolicySettings
|
||||
s3middleware.MetricsSettings
|
||||
s3middleware.LogHTTPSettings
|
||||
}
|
||||
|
||||
type FrostFSID interface {
|
||||
|
@ -124,7 +125,9 @@ type Config struct {
|
|||
|
||||
func NewRouter(cfg Config) *chi.Mux {
|
||||
api := chi.NewRouter()
|
||||
|
||||
api.Use(
|
||||
s3middleware.LogHTTP(cfg.Log, cfg.MiddlewareSettings),
|
||||
s3middleware.Request(cfg.Log, cfg.MiddlewareSettings),
|
||||
middleware.ThrottleWithOpts(cfg.Throttle),
|
||||
middleware.Recoverer,
|
||||
|
|
|
@ -57,6 +57,7 @@ func (c *centerMock) Authenticate(*http.Request) (*middleware.Box, error) {
|
|||
type middlewareSettingsMock struct {
|
||||
denyByDefault bool
|
||||
aclEnabled bool
|
||||
logHTTP middleware.LogHTTPConfig
|
||||
}
|
||||
|
||||
func (r *middlewareSettingsMock) NamespaceHeader() string {
|
||||
|
@ -74,6 +75,9 @@ func (r *middlewareSettingsMock) PolicyDenyByDefault() bool {
|
|||
func (r *middlewareSettingsMock) ACLEnabled() bool {
|
||||
return r.aclEnabled
|
||||
}
|
||||
func (r *middlewareSettingsMock) LogHTTPConfig() middleware.LogHTTPConfig {
|
||||
return r.logHTTP
|
||||
}
|
||||
|
||||
type frostFSIDMock struct {
|
||||
}
|
||||
|
|
|
@ -86,6 +86,7 @@ type (
|
|||
|
||||
appSettings struct {
|
||||
logLevel zap.AtomicLevel
|
||||
httpLogging s3middleware.LogHTTPConfig
|
||||
maxClient maxClientsConfig
|
||||
defaultMaxAge int
|
||||
notificatorEnabled bool
|
||||
|
@ -203,6 +204,7 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
func newAppSettings(log *Logger, v *viper.Viper, key *keys.PrivateKey) *appSettings {
|
||||
settings := &appSettings{
|
||||
logLevel: log.lvl,
|
||||
httpLogging: s3middleware.LogHTTPConfig{},
|
||||
maxClient: newMaxClients(v),
|
||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||
notificatorEnabled: v.GetBool(cfgEnableNATS),
|
||||
|
@ -236,10 +238,23 @@ func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger)
|
|||
nsHeader := v.GetString(cfgResolveNamespaceHeader)
|
||||
nsConfig, defaultNamespaces := fetchNamespacesConfig(log, v)
|
||||
|
||||
httpLoggingEnabled := v.GetBool(cfgHTTPLoggingEnabled)
|
||||
httpLoggingMaxBody := v.GetInt64(cfgHTTPLoggingMaxBody)
|
||||
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
||||
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.namespaceHeader = nsHeader
|
||||
s.httpLogging.Enabled = httpLoggingEnabled
|
||||
s.httpLogging.MaxBody = httpLoggingMaxBody
|
||||
s.httpLogging.MaxLogSize = httpLoggingMaxLogSize
|
||||
s.httpLogging.OutputPath = httpLoggingOutputPath
|
||||
s.httpLogging.UseGzip = httpLoggingUseGzip
|
||||
s.httpLogging.InitHTTPLogger(log)
|
||||
|
||||
s.namespaceHeader = namespaceHeader
|
||||
s.defaultNamespaces = defaultNamespaces
|
||||
s.namespaces = nsConfig.Namespaces
|
||||
}
|
||||
|
@ -308,6 +323,13 @@ func (s *appSettings) DefaultCopiesNumbers(namespace string) []uint32 {
|
|||
return s.namespaces[namespace].CopiesNumbers[defaultConstraintName]
|
||||
}
|
||||
|
||||
func (s *appSettings) LogHTTPConfig() s3middleware.LogHTTPConfig {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.httpLogging
|
||||
}
|
||||
|
||||
func (s *appSettings) NewXMLDecoder(r io.Reader) *xml.Decoder {
|
||||
dec := xml.NewDecoder(r)
|
||||
|
||||
|
|
|
@ -71,6 +71,14 @@ const ( // Settings.
|
|||
cfgLoggerLevel = "logger.level"
|
||||
cfgLoggerDestination = "logger.destination"
|
||||
|
||||
// HttpLogging.
|
||||
cfgHTTPLoggingEnabled = "http_logging.enabled"
|
||||
cfgHTTPLoggingMaxBody = "http_logging.max_body"
|
||||
cfgHTTPLoggingMaxLogSize = "http_logging.max_log_size"
|
||||
cfgHTTPLoggingDestination = "http_logging.destination"
|
||||
cfgHTTPLoggingGzip = "http_logging.gzip"
|
||||
cfgHTTPLoggingLogResponse = "http_logging.log_response"
|
||||
|
||||
// Wallet.
|
||||
cfgWalletPath = "wallet.path"
|
||||
cfgWalletAddress = "wallet.address"
|
||||
|
@ -701,6 +709,14 @@ func newSettings() *viper.Viper {
|
|||
v.SetDefault(cfgLoggerLevel, "debug")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
|
||||
// http logger
|
||||
v.SetDefault(cfgHTTPLoggingEnabled, false)
|
||||
v.SetDefault(cfgHTTPLoggingMaxBody, 1024)
|
||||
v.SetDefault(cfgHTTPLoggingMaxLogSize, 50)
|
||||
v.SetDefault(cfgHTTPLoggingDestination, "stdout")
|
||||
v.SetDefault(cfgHTTPLoggingGzip, false)
|
||||
v.SetDefault(cfgHTTPLoggingLogResponse, true)
|
||||
|
||||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
v.SetDefault(cfgStreamTimeout, defaultStreamTimeout)
|
||||
|
|
208
cmd/s3-playback/modules/run.go
Normal file
208
cmd/s3-playback/modules/run.go
Normal file
|
@ -0,0 +1,208 @@
|
|||
package modules
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/internal/playback"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
const (
|
||||
cfgPrintResponseLimit = "print-response-limit"
|
||||
cfgLogPath = "log"
|
||||
cfgEndpoint = "endpoint"
|
||||
awsAccessKey = "credentials.access_key"
|
||||
awsSecretKey = "credentials.secret_key"
|
||||
)
|
||||
|
||||
var runCmd = &cobra.Command{
|
||||
Use: "run",
|
||||
Short: "Send requests from log file",
|
||||
Long: "Reads the network log file and sends each request to the specified URL",
|
||||
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
|
||||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) (err error) {
|
||||
viper.SetDefault(cfgPrintResponseLimit, defaultPrintResponseLimit)
|
||||
return viper.BindPFlags(cmd.Flags())
|
||||
},
|
||||
RunE: run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
runCmd.Flags().String(cfgLogPath, "./request.log", "log file path")
|
||||
runCmd.Flags().String(cfgEndpoint, "", "endpoint URL")
|
||||
runCmd.Flags().Int(cfgPrintResponseLimit, defaultPrintResponseLimit, "print limit for http response body")
|
||||
}
|
||||
|
||||
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
|
||||
cmd.Println(r.Method, r.URL.RequestURI())
|
||||
cmd.Println(resp.Status)
|
||||
if resp.ContentLength == 0 {
|
||||
return
|
||||
}
|
||||
detect := detector.NewDetector(resp.Body, xmlutils.DetectXML)
|
||||
dataType, err := detect.Detect()
|
||||
if err != nil {
|
||||
cmd.PrintErrln("type detection error:", err.Error())
|
||||
return
|
||||
}
|
||||
body := &bytes.Buffer{}
|
||||
resultWriter := xmlutils.ChooseWriter(dataType, body)
|
||||
_, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(cfgPrintResponseLimit)))
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
}
|
||||
if err = resultWriter.Close(); err != nil {
|
||||
cmd.Printf("could not close response body: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
cmd.Println(body.String())
|
||||
cmd.Println()
|
||||
}
|
||||
|
||||
func run(cmd *cobra.Command, _ []string) error {
|
||||
ctx := cmd.Context()
|
||||
settings := &playback.Settings{
|
||||
Endpoint: viper.GetString(cfgEndpoint),
|
||||
Creds: playback.Credentials{
|
||||
AccessKey: viper.GetString(awsAccessKey),
|
||||
SecretKey: viper.GetString(awsSecretKey),
|
||||
},
|
||||
Multiparts: make(map[string]playback.MultipartUpload),
|
||||
Client: &http.Client{
|
||||
Transport: http.DefaultTransport,
|
||||
Timeout: viper.GetDuration(cfgHTTPTimeoutFlag),
|
||||
},
|
||||
}
|
||||
|
||||
file, err := os.Open(viper.GetString(cfgLogPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
reader := bufio.NewReader(file)
|
||||
|
||||
if viper.GetBool(cfgSkipVerifyTLS) {
|
||||
settings.Client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
}
|
||||
|
||||
id := 1
|
||||
for {
|
||||
logReq, err := getRequestFromLog(reader)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
|
||||
id++
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("interrupted: %w", ctx.Err())
|
||||
default:
|
||||
r, resp, err := playbackRequest(ctx, logReq, settings)
|
||||
if err != nil {
|
||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
||||
id++
|
||||
continue
|
||||
}
|
||||
cmd.Print(strconv.Itoa(id) + ") ")
|
||||
logResponse(cmd, r, resp)
|
||||
id++
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
|
||||
var logReq playback.LoggedRequest
|
||||
req, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return logReq, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(req), &logReq)
|
||||
if err != nil {
|
||||
return logReq, err
|
||||
}
|
||||
|
||||
return logReq, nil
|
||||
}
|
||||
|
||||
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
|
||||
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, *http.Response, error) {
|
||||
r, err := prepareRequest(ctx, logReq, settings)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
|
||||
}
|
||||
resp, err := settings.Client.Do(r)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to send request: %w", err)
|
||||
}
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err = playback.HandleResponse(r, settings.Multiparts, respBody, logReq.Response); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
||||
}
|
||||
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
||||
|
||||
return r, resp, nil
|
||||
}
|
||||
|
||||
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
||||
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, error) {
|
||||
r, err := http.NewRequestWithContext(ctx, logReq.Method, settings.Endpoint+logReq.URI, bytes.NewReader(logReq.Payload))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Header = logReq.Header
|
||||
sha256hash := sha256.New()
|
||||
sha256hash.Write(logReq.Payload)
|
||||
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
||||
if r.Header.Get(api.ContentMD5) != "" {
|
||||
sha256hash.Reset()
|
||||
md5hash := md5.New()
|
||||
md5hash.Write(logReq.Payload)
|
||||
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
||||
}
|
||||
if r.URL.Query().Has("uploadId") {
|
||||
if err = playback.SwapUploadID(r, settings); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if r.Header.Get(auth.AuthorizationHdr) != "" {
|
||||
if err = playback.Sign(ctx, r, settings.Creds); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
|
@ -48,6 +48,17 @@ S3_GW_CONFIG=/path/to/config/yaml
|
|||
# Logger
|
||||
S3_GW_LOGGER_LEVEL=debug
|
||||
|
||||
# HTTP logger
|
||||
S3_GW_HTTP_LOGGING_ENABLED=false
|
||||
# max body size to log
|
||||
S3_GW_HTTP_LOGGING_MAX_BODY=1024
|
||||
# max log size in Mb
|
||||
S3_GW_HTTP_LOGGING_MAX_LOG_SIZE: 20
|
||||
# use log compression
|
||||
S3_GW_HTTP_LOGGING_GZIP=true
|
||||
# possible destination output values: filesystem path, url, "stdout", "stderr"
|
||||
S3_GW_HTTP_LOGGING_DESTINATION=stdout
|
||||
|
||||
# RPC endpoint and order of resolving of bucket names
|
||||
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/
|
||||
S3_GW_RESOLVE_ORDER="nns dns"
|
||||
|
|
|
@ -54,6 +54,18 @@ logger:
|
|||
level: debug
|
||||
destination: stdout
|
||||
|
||||
# log http request data (URI, headers, query, etc)
|
||||
http_logging:
|
||||
enabled: false
|
||||
# max body size to log
|
||||
max_body: 1024
|
||||
# max log size in Mb
|
||||
max_log_size: 20
|
||||
# use log compression
|
||||
gzip: true
|
||||
# possible output values: filesystem path, url, "stdout", "stderr"
|
||||
destination: stdout
|
||||
|
||||
# RPC endpoint and order of resolving of bucket names
|
||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||
resolve_order:
|
||||
|
|
5
go.mod
5
go.mod
|
@ -19,7 +19,7 @@ require (
|
|||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
github.com/prometheus/client_golang v1.15.1
|
||||
github.com/prometheus/client_model v0.3.0
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.15.0
|
||||
github.com/ssgreg/journald v1.0.0
|
||||
|
@ -32,6 +32,7 @@ require (
|
|||
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
|
||||
google.golang.org/grpc v1.59.0
|
||||
google.golang.org/protobuf v1.33.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -43,7 +44,7 @@ require (
|
|||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
|
|
10
go.sum
10
go.sum
|
@ -91,8 +91,8 @@ github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWH
|
|||
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -294,8 +294,8 @@ github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
|
|||
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
|
||||
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
|
||||
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
|
||||
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
|
||||
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
|
||||
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
|
||||
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
|
||||
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
|
||||
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
|
@ -704,6 +704,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
|||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
|
||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
@ -104,6 +104,13 @@ const (
|
|||
FailedToPassAuthentication = "failed to pass authentication" // Error in ../../api/middleware/auth.go
|
||||
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
|
||||
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
|
||||
LogHTTP = "http log" // Info in ../../api/middleware/log_http.go
|
||||
FailedToCloseHTTPBody = "failed to close http body" // Error in ../../api/middleware/log_http.go
|
||||
FailedToInitializeHTTPLogger = "failed to initialize http logger" // Error in ../../api/middleware/log_http.go
|
||||
FailedToReloadHTTPFileLogger = "failed to reload http file logger" // Error in ../../api/middleware/log_http.go
|
||||
FailedToReadHTTPBody = "failed to read http body" // Error in ../../api/middleware/log_http.go
|
||||
FailedToProcessHTTPBody = "failed to process http body" // Error in ../../api/middleware/log_http.go
|
||||
LogHTTPDisabledInThisBuild = "http logging disabled in this build" // Warn in ../../api/middleware/log_http_stub.go
|
||||
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
|
||||
CouldNotHandleMessage = "could not handle message" // Error in ../../api/notifications/controller.go
|
||||
CouldNotACKMessage = "could not ACK message" // Error in ../../api/notifications/controller.go
|
||||
|
|
48
pkg/xmlutils/xmlutils.go
Normal file
48
pkg/xmlutils/xmlutils.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package xmlutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
)
|
||||
|
||||
type nopCloseWriter struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (b nopCloseWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
nonXML = "nonXML"
|
||||
typeXML = "application/xml"
|
||||
)
|
||||
|
||||
func DetectXML(data []byte) string {
|
||||
token, err := xml.NewDecoder(bytes.NewReader(data)).RawToken()
|
||||
if err != nil {
|
||||
return nonXML
|
||||
}
|
||||
|
||||
switch token.(type) {
|
||||
case xml.StartElement, xml.ProcInst:
|
||||
return typeXML
|
||||
}
|
||||
return nonXML
|
||||
}
|
||||
|
||||
func ChooseWriter(dataType string, bodyWriter io.Writer) io.WriteCloser {
|
||||
if dataType == typeXML {
|
||||
return nopCloseWriter{bodyWriter}
|
||||
}
|
||||
return base64.NewEncoder(base64.StdEncoding, bodyWriter)
|
||||
}
|
||||
|
||||
func ChooseReader(dataType string, bodyReader io.Reader) io.Reader {
|
||||
if dataType == typeXML {
|
||||
return bodyReader
|
||||
}
|
||||
return base64.NewDecoder(base64.StdEncoding, bodyReader)
|
||||
}
|
Loading…
Reference in a new issue