[#369] Conditional HTTP-body encode & decode #453
14 changed files with 290 additions and 132 deletions
|
@ -12,6 +12,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"mime"
|
"mime"
|
||||||
|
"net/http"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -21,6 +22,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -237,11 +239,11 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
if r != nil {
|
if r != nil {
|
||||||
if len(p.Header[api.ContentType]) == 0 {
|
if len(p.Header[api.ContentType]) == 0 {
|
||||||
if contentType := MimeByFilePath(p.Object); len(contentType) == 0 {
|
if contentType := MimeByFilePath(p.Object); len(contentType) == 0 {
|
||||||
d := newDetector(r)
|
d := detector.NewDetector(r, http.DetectContentType)
|
||||||
if contentType, err := d.Detect(); err == nil {
|
if contentType, err := d.Detect(); err == nil {
|
||||||
p.Header[api.ContentType] = contentType
|
p.Header[api.ContentType] = contentType
|
||||||
}
|
}
|
||||||
r = d.MultiReader()
|
r = d.RestoredReader()
|
||||||
} else {
|
} else {
|
||||||
p.Header[api.ContentType] = contentType
|
p.Header[api.ContentType] = contentType
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,31 +4,46 @@ package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"gopkg.in/natefinch/lumberjack.v2"
|
"gopkg.in/natefinch/lumberjack.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LogHTTPConfig struct {
|
type (
|
||||||
|
LogHTTPConfig struct {
|
||||||
Enabled bool
|
Enabled bool
|
||||||
MaxBody int64
|
MaxBody int64
|
||||||
MaxLogSize int
|
MaxLogSize int
|
||||||
OutputPath string
|
OutputPath string
|
||||||
UseGzip bool
|
UseGzip bool
|
||||||
LogResponse bool
|
|
||||||
}
|
}
|
||||||
|
fileLogger struct {
|
||||||
type fileLogger struct {
|
|
||||||
*zap.Logger
|
*zap.Logger
|
||||||
logRoller *lumberjack.Logger
|
logRoller *lumberjack.Logger
|
||||||
}
|
}
|
||||||
|
// Implementation of zap.Sink for using lumberjack.
|
||||||
|
lumberjackSink struct {
|
||||||
|
*lumberjack.Logger
|
||||||
|
}
|
||||||
|
// responseReadWriter helps read http response body.
|
||||||
|
responseReadWriter struct {
|
||||||
|
http.ResponseWriter
|
||||||
|
response *bytes.Buffer
|
||||||
|
statusCode int
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
payloadLabel = "payload"
|
||||||
|
responseLabel = "response"
|
||||||
|
)
|
||||||
|
|
||||||
var filelog = fileLogger{}
|
var filelog = fileLogger{}
|
||||||
|
|
||||||
|
@ -53,6 +68,13 @@ func newFileLogger(conf *LogHTTPConfig) (fileLogger, error) {
|
||||||
return flog, nil
|
return flog, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter {
|
||||||
|
return &responseReadWriter{
|
||||||
|
ResponseWriter: w,
|
||||||
|
response: &bytes.Buffer{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// registerOutputSink creates and registers sink for logger file output.
|
// registerOutputSink creates and registers sink for logger file output.
|
||||||
func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error {
|
func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error {
|
||||||
f.logRoller = &lumberjack.Logger{
|
f.logRoller = &lumberjack.Logger{
|
||||||
|
@ -72,11 +94,6 @@ func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implementation of zap.Sink for using lumberjack.
|
|
||||||
type lumberjackSink struct {
|
|
||||||
*lumberjack.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lumberjackSink) Sync() error {
|
func (lumberjackSink) Sync() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -98,17 +115,16 @@ func ReloadFileLogger(conf *LogHTTPConfig) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// responseReadWriter helps read http response body.
|
|
||||||
type responseReadWriter struct {
|
|
||||||
http.ResponseWriter
|
|
||||||
response *bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ww *responseReadWriter) Write(data []byte) (int, error) {
|
func (ww *responseReadWriter) Write(data []byte) (int, error) {
|
||||||
ww.response.Write(data)
|
ww.response.Write(data)
|
||||||
return ww.ResponseWriter.Write(data)
|
return ww.ResponseWriter.Write(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ww *responseReadWriter) WriteHeader(code int) {
|
||||||
|
ww.statusCode = code
|
||||||
|
ww.ResponseWriter.WriteHeader(code)
|
||||||
|
}
|
||||||
|
|
||||||
func (ww *responseReadWriter) Flush() {
|
func (ww *responseReadWriter) Flush() {
|
||||||
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
|
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
|
||||||
f.Flush()
|
f.Flush()
|
||||||
|
@ -120,7 +136,7 @@ func LogHTTP(l *zap.Logger, config *LogHTTPConfig) Func {
|
||||||
var err error
|
var err error
|
||||||
filelog, err = newFileLogger(config)
|
filelog, err = newFileLogger(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Warn(logs.FailedToInitializeHTTPLogger)
|
l.Error(logs.FailedToInitializeHTTPLogger)
|
||||||
return func(h http.Handler) http.Handler {
|
return func(h http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
h.ServeHTTP(w, r)
|
h.ServeHTTP(w, r)
|
||||||
|
@ -133,38 +149,62 @@ func LogHTTP(l *zap.Logger, config *LogHTTPConfig) Func {
|
||||||
h.ServeHTTP(w, r)
|
h.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var httplog = filelog.With(
|
|
||||||
zap.String("from", r.RemoteAddr),
|
|
||||||
zap.String("URI", r.RequestURI),
|
|
||||||
zap.String("method", r.Method),
|
|
||||||
)
|
|
||||||
|
|
||||||
httplog = withFieldIfExist(httplog, "query", r.URL.Query())
|
httplog := filelog.getHTTPLogger(r).
|
||||||
httplog = withFieldIfExist(httplog, "headers", r.Header)
|
withFieldIfExist("query", r.URL.Query()).
|
||||||
if r.ContentLength > 0 && r.ContentLength <= config.MaxBody {
|
withFieldIfExist("headers", r.Header)
|
||||||
httplog, err = withBody(httplog, r)
|
|
||||||
if err != nil {
|
payload := getBody(r.Body, l)
|
||||||
l.Warn(logs.FailedToGetRequestBody, zap.Error(err))
|
r.Body = io.NopCloser(bytes.NewReader(payload))
|
||||||
}
|
|
||||||
}
|
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
|
||||||
if !config.LogResponse {
|
httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l)
|
||||||
|
|
||||||
|
wr := newResponseReadWriter(w)
|
||||||
|
h.ServeHTTP(wr, r)
|
||||||
|
|
||||||
|
respReader := io.LimitReader(wr.response, config.MaxBody)
|
||||||
|
httplog = httplog.withProcessedBody(responseLabel, respReader, l)
|
||||||
|
httplog = httplog.with(zap.Int("status", wr.statusCode))
|
||||||
|
|
||||||
httplog.Info(logs.LogHTTP)
|
httplog.Info(logs.LogHTTP)
|
||||||
h.ServeHTTP(w, r)
|
|
||||||
} else {
|
|
||||||
var resp = bytes.Buffer{}
|
|
||||||
ww := &responseReadWriter{ResponseWriter: w, response: &resp}
|
|
||||||
h.ServeHTTP(ww, r)
|
|
||||||
if int64(resp.Len()) <= config.MaxBody {
|
|
||||||
httplog.With(zap.String("response", base64.StdEncoding.EncodeToString(resp.Bytes()))).
|
|
||||||
Info(logs.LogHTTP)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fileLogger) with(fields ...zap.Field) *fileLogger {
|
||||||
|
return &fileLogger{
|
||||||
|
Logger: f.Logger.With(fields...),
|
||||||
|
logRoller: f.logRoller,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fileLogger) getHTTPLogger(r *http.Request) *fileLogger {
|
||||||
|
return &fileLogger{
|
||||||
|
Logger: f.Logger.With(
|
||||||
|
zap.String("from", r.RemoteAddr),
|
||||||
|
zap.String("URI", r.RequestURI),
|
||||||
|
zap.String("method", r.Method),
|
||||||
|
zap.String("protocol", r.Proto),
|
||||||
|
),
|
||||||
|
logRoller: f.logRoller,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fileLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *fileLogger {
|
||||||
|
resp, err := processBody(bodyReader)
|
||||||
|
if err != nil {
|
||||||
|
l.Error(logs.FailedToProcessHTTPBody,
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("body type", payloadLabel))
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.with(zap.ByteString(label, resp))
|
||||||
|
}
|
||||||
|
|
||||||
// newLoggerConfig creates new zap.Config with disabled base fields.
|
// newLoggerConfig creates new zap.Config with disabled base fields.
|
||||||
func (*fileLogger) newLoggerConfig() zap.Config {
|
func (f *fileLogger) newLoggerConfig() zap.Config {
|
||||||
c := zap.NewProductionConfig()
|
c := zap.NewProductionConfig()
|
||||||
c.DisableCaller = true
|
c.DisableCaller = true
|
||||||
c.DisableStacktrace = true
|
c.DisableStacktrace = true
|
||||||
|
@ -172,28 +212,47 @@ func (*fileLogger) newLoggerConfig() zap.Config {
|
||||||
c.EncoderConfig.LevelKey = zapcore.OmitKey
|
c.EncoderConfig.LevelKey = zapcore.OmitKey
|
||||||
c.EncoderConfig.TimeKey = zapcore.OmitKey
|
c.EncoderConfig.TimeKey = zapcore.OmitKey
|
||||||
c.EncoderConfig.FunctionKey = zapcore.OmitKey
|
c.EncoderConfig.FunctionKey = zapcore.OmitKey
|
||||||
|
c.Sampling = nil
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// withBody reads body and attach it to log output.
|
func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte {
|
||||||
func withBody(httplog *zap.Logger, r *http.Request) (*zap.Logger, error) {
|
defer httpBody.Close()
|
||||||
body, err := io.ReadAll(r.Body)
|
|
||||||
|
body, err := io.ReadAll(httpBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("read body error: %w", err)
|
l.Error(logs.FailedToGetHTTPBody,
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("body type", payloadLabel))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return body
|
||||||
}
|
}
|
||||||
defer r.Body.Close()
|
|
||||||
r.Body = io.NopCloser(bytes.NewBuffer(body))
|
|
||||||
|
|
||||||
httplog = httplog.With(zap.String("body", base64.StdEncoding.EncodeToString(body)))
|
// processBody reads body and base64 encode it if it's not XML.
|
||||||
|
func processBody(bodyReader io.Reader) ([]byte, error) {
|
||||||
alexvanin marked this conversation as resolved
|
|||||||
|
resultBody := &bytes.Buffer{}
|
||||||
|
detect := detector.NewDetector(bodyReader, utils.DetectXML)
|
||||||
|
dataType, err := detect.Detect()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
writer := utils.ChooseWriter(dataType, resultBody)
|
||||||
|
if _, err = io.Copy(writer, detect.RestoredReader()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = writer.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return httplog, nil
|
return resultBody.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// withFieldIfExist checks whether data is not empty and attach it to log output.
|
// withFieldIfExist checks whether data is not empty and attach it to log output.
|
||||||
func withFieldIfExist(log *zap.Logger, label string, data map[string][]string) *zap.Logger {
|
func (f *fileLogger) withFieldIfExist(label string, data map[string][]string) *fileLogger {
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
log = log.With(zap.Any(label, data))
|
return f.with(zap.Any(label, data))
|
||||||
}
|
}
|
||||||
return log
|
return f
|
||||||
}
|
}
|
||||||
|
|
|
@ -573,7 +573,6 @@ func (s *appSettings) updateHTTPLoggingSettings(cfg *viper.Viper, log *zap.Logge
|
||||||
s.httpLogging.MaxLogSize = cfg.GetInt(cfgHTTPLoggingMaxLogSize)
|
s.httpLogging.MaxLogSize = cfg.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||||
s.httpLogging.OutputPath = cfg.GetString(cfgHTTPLoggingDestination)
|
s.httpLogging.OutputPath = cfg.GetString(cfgHTTPLoggingDestination)
|
||||||
s.httpLogging.UseGzip = cfg.GetBool(cfgHTTPLoggingGzip)
|
s.httpLogging.UseGzip = cfg.GetBool(cfgHTTPLoggingGzip)
|
||||||
s.httpLogging.LogResponse = cfg.GetBool(cfgHTTPLoggingLogResponse)
|
|
||||||
if err := s3middleware.ReloadFileLogger(s.httpLogging); err != nil {
|
if err := s3middleware.ReloadFileLogger(s.httpLogging); err != nil {
|
||||||
log.Error(logs.FailedToReloadHTTPFileLogger, zap.Error(err))
|
log.Error(logs.FailedToReloadHTTPFileLogger, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,12 +10,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
defaultPrintResponseLimit = 1024
|
||||||
configPath = "config"
|
configPath = "config"
|
||||||
configPathFlag = "config"
|
configPathFlag = "config"
|
||||||
httpTimeout = "http_timeout"
|
httpTimeout = "http_timeout"
|
||||||
httpTimeoutFlag = "http-timeout"
|
httpTimeoutFlag = "http-timeout"
|
||||||
skipVerifyTLS = "skip_verify_tls"
|
skipVerifyTLS = "skip_verify_tls"
|
||||||
skipVerifyTLSFlag = "skip-verify-tls"
|
skipVerifyTLSFlag = "skip-verify-tls"
|
||||||
|
printResponseLimit = "print_response_limit"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -36,6 +38,8 @@ var (
|
||||||
|
|
||||||
_ = viper.BindPFlag(skipVerifyTLS, cmd.PersistentFlags().Lookup(skipVerifyTLSFlag))
|
_ = viper.BindPFlag(skipVerifyTLS, cmd.PersistentFlags().Lookup(skipVerifyTLSFlag))
|
||||||
|
|
||||||
|
viper.SetDefault(printResponseLimit, defaultPrintResponseLimit)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||||
|
|
|
@ -18,7 +18,9 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/request"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
@ -55,26 +57,41 @@ func init() {
|
||||||
runCmd.Flags().String(endpointFlag, "", "endpoint URL")
|
runCmd.Flags().String(endpointFlag, "", "endpoint URL")
|
||||||
}
|
}
|
||||||
|
|
||||||
func logResponse(cmd *cobra.Command, id int, resp *http.Response, logReq request.LoggedRequest) {
|
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
|
||||||
cmd.Println(strconv.Itoa(id)+")", logReq.Method, logReq.URI)
|
cmd.Println(r.Method, r.URL.RequestURI())
|
||||||
cmd.Println(resp.Status)
|
cmd.Println(resp.Status)
|
||||||
if resp.ContentLength != 0 {
|
if resp.ContentLength == 0 {
|
||||||
body, err := io.ReadAll(io.LimitReader(resp.Body, resp.ContentLength))
|
return
|
||||||
|
}
|
||||||
|
detect := detector.NewDetector(resp.Body, utils.DetectXML)
|
||||||
|
dataType, err := detect.Detect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Println(id, err)
|
cmd.Println(err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
cmd.Println(string(body))
|
body := &bytes.Buffer{}
|
||||||
|
resultWriter := utils.ChooseWriter(dataType, body)
|
||||||
|
_, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(printResponseLimit)))
|
||||||
|
if err != nil {
|
||||||
|
cmd.Println(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
if err = resultWriter.Close(); err != nil {
|
||||||
|
cmd.Printf("could not close response body: %s\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.Println(body.String())
|
||||||
cmd.Println()
|
cmd.Println()
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(cmd *cobra.Command, _ []string) error {
|
func run(cmd *cobra.Command, _ []string) error {
|
||||||
ctx := request.SetCredentials(
|
ctx := playback.SetCredentials(
|
||||||
cmd.Context(),
|
cmd.Context(),
|
||||||
viper.GetString(awsAccessKey),
|
viper.GetString(awsAccessKey),
|
||||||
viper.GetString(awsSecretKey),
|
viper.GetString(awsSecretKey),
|
||||||
)
|
)
|
||||||
ctx = request.WithMultiparts(ctx)
|
ctx = playback.WithMultiparts(ctx)
|
||||||
|
|
||||||
file, err := os.Open(viper.GetString(logPathFlag))
|
file, err := os.Open(viper.GetString(logPathFlag))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -109,13 +126,14 @@ func run(cmd *cobra.Command, _ []string) error {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return fmt.Errorf("interrupted: %w", ctx.Err())
|
return fmt.Errorf("interrupted: %w", ctx.Err())
|
||||||
default:
|
default:
|
||||||
resp, err := playback(ctx, logReq, client)
|
r, resp, err := playbackRequest(ctx, logReq, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
||||||
id++
|
id++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
logResponse(cmd, id, resp, logReq)
|
cmd.Print(strconv.Itoa(id) + ") ")
|
||||||
|
logResponse(cmd, r, resp)
|
||||||
id++
|
id++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,8 +141,8 @@ func run(cmd *cobra.Command, _ []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
|
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
|
||||||
var logReq request.LoggedRequest
|
var logReq playback.LoggedRequest
|
||||||
req, err := reader.ReadString('\n')
|
req, err := reader.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return logReq, err
|
return logReq, err
|
||||||
|
@ -138,55 +156,55 @@ func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
|
||||||
return logReq, nil
|
return logReq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// playback creates http.Request from LoggedRequest and sends it to specified endpoint.
|
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
|
||||||
func playback(ctx context.Context, logReq request.LoggedRequest, client *http.Client) (*http.Response, error) {
|
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, client *http.Client) (*http.Request, *http.Response, error) {
|
||||||
r, err := prepareRequest(ctx, logReq)
|
r, err := prepareRequest(ctx, logReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to prepare request: %w", err)
|
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
|
||||||
}
|
}
|
||||||
resp, err := client.Do(r)
|
resp, err := client.Do(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to send request: %w", err)
|
return nil, nil, fmt.Errorf("failed to send request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := io.ReadAll(resp.Body)
|
respBody, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if err = request.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
|
if err = playback.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
|
||||||
return nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
||||||
}
|
}
|
||||||
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
||||||
|
|
||||||
return resp, nil
|
return r, resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
||||||
func prepareRequest(ctx context.Context, logReq request.LoggedRequest) (*http.Request, error) {
|
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest) (*http.Request, error) {
|
||||||
r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI,
|
r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI,
|
||||||
bytes.NewReader(logReq.Body))
|
bytes.NewReader(logReq.Payload))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r.Header = logReq.Header
|
r.Header = logReq.Header
|
||||||
|
|
||||||
err = request.SwapUploadID(ctx, r)
|
err = playback.SwapUploadID(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sha256hash := sha256.New()
|
sha256hash := sha256.New()
|
||||||
sha256hash.Write(logReq.Body)
|
sha256hash.Write(logReq.Payload)
|
||||||
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
||||||
if r.Header.Get(api.ContentMD5) != "" {
|
if r.Header.Get(api.ContentMD5) != "" {
|
||||||
sha256hash.Reset()
|
sha256hash.Reset()
|
||||||
md5hash := md5.New()
|
md5hash := md5.New()
|
||||||
md5hash.Write(logReq.Body)
|
md5hash.Write(logReq.Payload)
|
||||||
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
||||||
}
|
}
|
||||||
err = request.Sign(ctx, r)
|
err = playback.Sign(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
endpoint: http://localhost:8084
|
endpoint: http://localhost:8084
|
||||||
log: ./log/multipart5.log
|
log: ./log/request.log
|
||||||
credentials:
|
credentials:
|
||||||
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
|
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
|
||||||
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
|
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
|
||||||
http_timeout: 60s
|
http_timeout: 60s
|
||||||
skip_verify_tls: false
|
skip_verify_tls: false
|
||||||
|
print_response_limit: 1024
|
|
@ -389,7 +389,6 @@ http_logging:
|
||||||
max_log_size: 20
|
max_log_size: 20
|
||||||
gzip: true
|
gzip: true
|
||||||
destination: stdout
|
destination: stdout
|
||||||
log_response: true
|
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -399,7 +398,6 @@ http_logging:
|
||||||
| `max_log_size` | int | yes | 50 | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. |
|
| `max_log_size` | int | yes | 50 | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. |
|
||||||
| `gzip` | bool | yes | false | Whether to enable Gzip compression to backup log files. |
|
| `gzip` | bool | yes | false | Whether to enable Gzip compression to backup log files. |
|
||||||
| `destination` | string | yes | stdout | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. |
|
| `destination` | string | yes | stdout | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. |
|
||||||
| `log_response` | bool | yes | true | Whether to attach response body to the log. |
|
|
||||||
|
|
||||||
|
|
||||||
### `cache` section
|
### `cache` section
|
||||||
|
|
|
@ -31,14 +31,15 @@ If corresponding flag is set, it overrides parameter from config.
|
||||||
|
|
||||||
### Configuration parameters
|
### Configuration parameters
|
||||||
|
|
||||||
#### Global params
|
#### Global parameters
|
||||||
| # | Config parameter name | Flag name | Type | Default value | Description |
|
| # | Config parameter name | Flag name | Type | Default value | Description |
|
||||||
|---|-----------------------|-----------------|----------|---------------|--------------------------------------------------------|
|
|---|-----------------------|-----------------|----------|---------------|-------------------------------------------------------------------------------|
|
||||||
| 1 | - | config | string | - | config file path (e.g. `./config/playback.yaml`) |
|
| 1 | - | config | string | - | config file path (e.g. `./config/playback.yaml`) |
|
||||||
| 2 | http_timeout | http-timeout | duration | 60s | http request timeout |
|
| 2 | http_timeout | http-timeout | duration | 60s | http request timeout |
|
||||||
| 3 | skip_verify_tls | skip-verify-tls | bool | false | skips tls certificate verification for https endpoints |
|
| 3 | skip_verify_tls | skip-verify-tls | bool | false | skips tls certificate verification for https endpoints |
|
||||||
| 4 | credentials.accessKey | - | string | - | AWS access key id |
|
| 4 | credentials.accessKey | - | string | - | AWS access key id |
|
||||||
| 5 | credentials.secretKey | - | string | - | AWS secret key |
|
| 5 | credentials.secretKey | - | string | - | AWS secret key |
|
||||||
|
| 6 | print_response_limit | - | int | 1024 | max response length to be printed in stdout, the rest of body will be omitted |
|
||||||
|
|
||||||
#### `run` command parameters
|
#### `run` command parameters
|
||||||
| # | Config parameter name | Flag name | Type | Default value | Description |
|
| # | Config parameter name | Flag name | Type | Default value | Description |
|
||||||
|
|
|
@ -101,9 +101,10 @@ const (
|
||||||
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
|
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
|
||||||
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
|
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
|
||||||
LogHTTP = "http log" // Info in ../../api/middleware/log_http.go
|
LogHTTP = "http log" // Info in ../../api/middleware/log_http.go
|
||||||
FailedToInitializeHTTPLogger = "failed to initialize http logger" // Warn in ../../api/middleware/log_http.go
|
FailedToInitializeHTTPLogger = "failed to initialize http logger" // Error in ../../api/middleware/log_http.go
|
||||||
FailedToReloadHTTPFileLogger = "failed to reload http file logger" // Warn in ../../api/middleware/log_http.go
|
FailedToReloadHTTPFileLogger = "failed to reload http file logger" // Error in ../../api/middleware/log_http.go
|
||||||
FailedToGetRequestBody = "failed to get request body" // Warn in ../../api/middleware/log_http.go
|
FailedToGetHTTPBody = "failed to get http body" // Error in ../../api/middleware/log_http.go
|
||||||
|
FailedToProcessHTTPBody = "failed to process http body" // Error in ../../api/middleware/log_http.go
|
||||||
LogHTTPDisabledInThisBuild = "http logging disabled in this build" // Warn in ../../api/middleware/log_http_stub.go
|
LogHTTPDisabledInThisBuild = "http logging disabled in this build" // Warn in ../../api/middleware/log_http_stub.go
|
||||||
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
|
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
|
||||||
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
|
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
package layer
|
package detector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
detector struct {
|
Detector struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
err error
|
err error
|
||||||
data []byte
|
data []byte
|
||||||
|
detectFunc func([]byte) string
|
||||||
}
|
}
|
||||||
errReader struct {
|
errReader struct {
|
||||||
data []byte
|
data []byte
|
||||||
|
@ -36,23 +36,24 @@ func (r *errReader) Read(b []byte) (int, error) {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDetector(reader io.Reader) *detector {
|
func NewDetector(reader io.Reader, detectFunc func([]byte) string) *Detector {
|
||||||
return &detector{
|
return &Detector{
|
||||||
data: make([]byte, contentTypeDetectSize),
|
data: make([]byte, contentTypeDetectSize),
|
||||||
Reader: reader,
|
Reader: reader,
|
||||||
|
detectFunc: detectFunc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *detector) Detect() (string, error) {
|
func (d *Detector) Detect() (string, error) {
|
||||||
n, err := d.Reader.Read(d.data)
|
n, err := d.Reader.Read(d.data)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
d.err = err
|
d.err = err
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
d.data = d.data[:n]
|
d.data = d.data[:n]
|
||||||
return http.DetectContentType(d.data), nil
|
return d.detectFunc(d.data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *detector) MultiReader() io.Reader {
|
func (d *Detector) RestoredReader() io.Reader {
|
||||||
return io.MultiReader(newReader(d.data, d.err), d.Reader)
|
return io.MultiReader(newReader(d.data, d.err), d.Reader)
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package request
|
package playback
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -1,16 +1,21 @@
|
||||||
package request
|
package playback
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils"
|
||||||
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||||
)
|
)
|
||||||
|
@ -22,14 +27,15 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
httpBody []byte
|
||||||
LoggedRequest struct {
|
LoggedRequest struct {
|
||||||
From string `json:"from"`
|
From string `json:"from"`
|
||||||
URI string `json:"URI"`
|
URI string `json:"URI"`
|
||||||
Method string `json:"method"`
|
Method string `json:"method"`
|
||||||
|
Payload httpBody `json:"payload,omitempty"`
|
||||||
|
Response httpBody `json:"response,omitempty"`
|
||||||
Query url.Values `json:"query"`
|
Query url.Values `json:"query"`
|
||||||
Body []byte `json:"body"`
|
|
||||||
Header http.Header `json:"headers"`
|
Header http.Header `json:"headers"`
|
||||||
Response []byte `json:"response"`
|
|
||||||
}
|
}
|
||||||
Credentials struct {
|
Credentials struct {
|
||||||
AccessKey string
|
AccessKey string
|
||||||
|
@ -39,6 +45,25 @@ type (
|
||||||
multipartKey struct{}
|
multipartKey struct{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (h *httpBody) UnmarshalJSON(data []byte) error {
|
||||||
|
unquoted, err := strconv.Unquote(string(data))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unquote data: %w", err)
|
||||||
|
}
|
||||||
|
detect := detector.NewDetector(strings.NewReader(unquoted), utils.DetectXML)
|
||||||
|
dataType, err := detect.Detect()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to detect data: %w", err)
|
||||||
|
}
|
||||||
|
reader := utils.ChooseReader(dataType, detect.RestoredReader())
|
||||||
|
*h, err = io.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal httpbody: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func SetCredentials(ctx context.Context, accessKey, secretKey string) context.Context {
|
func SetCredentials(ctx context.Context, accessKey, secretKey string) context.Context {
|
||||||
return context.WithValue(ctx, contextKey{},
|
return context.WithValue(ctx, contextKey{},
|
||||||
Credentials{
|
Credentials{
|
|
@ -1,4 +1,4 @@
|
||||||
package request
|
package playback
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
49
playback/utils/utils.go
Normal file
49
playback/utils/utils.go
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/xml"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nopCloseWriter struct {
|
||||||
|
io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b nopCloseWriter) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
nonXML = "nonXML"
|
||||||
|
typeXML = "application/XML"
|
||||||
|
)
|
||||||
|
|
||||||
|
func DetectXML(data []byte) string {
|
||||||
|
token, err := xml.NewDecoder(bytes.NewReader(data)).RawToken()
|
||||||
|
if err != nil {
|
||||||
|
return nonXML
|
||||||
|
}
|
||||||
|
|
||||||
|
switch token.(type) {
|
||||||
|
case xml.StartElement, xml.ProcInst:
|
||||||
|
return typeXML
|
||||||
|
}
|
||||||
|
return nonXML
|
||||||
|
}
|
||||||
|
|
||||||
|
func ChooseWriter(dataType string, bodyWriter io.Writer) io.WriteCloser {
|
||||||
|
writeCloser := nopCloseWriter{bodyWriter}
|
||||||
|
if dataType == typeXML {
|
||||||
|
return writeCloser
|
||||||
|
}
|
||||||
|
return base64.NewEncoder(base64.StdEncoding, bodyWriter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ChooseReader(dataType string, bodyReader io.Reader) io.Reader {
|
||||||
|
if dataType == typeXML {
|
||||||
|
return bodyReader
|
||||||
|
}
|
||||||
|
return base64.NewDecoder(base64.StdEncoding, bodyReader)
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue
Have you looked at
api/layer/detector.go
? We can reuse the idea from there, maybe it will look a bit better.In detector we wrap reader and provide
Detect()
method to read a part of it to determine payload type.We can:
DetectXML
similar by having a wrapper and calling this function on a wrapper.This way we won't need to store
checkedBytes
and callwriter.Write
on it.Oh I see, it might not detect XML.
Then we can just simply reorganize it as a wrapper similar to
detector.go
.