From 490a9ed95ee273ab74b72bd46c7f9e1f6fe55c19 Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Fri, 2 Aug 2024 16:45:49 +0300 Subject: [PATCH] [#369] S3-playback http-body decoding Signed-off-by: Nikita Zinkevich --- api/middleware/log_http.go | 141 +++++++++++++++++--------------- cmd/s3-gw/app.go | 1 - cmd/s3-playback/modules/root.go | 16 ++-- cmd/s3-playback/modules/run.go | 78 +++++++++++------- config/playback/playback.yaml | 3 +- docs/configuration.md | 2 - docs/playback.md | 17 ++-- playback/request.go | 27 +++++- playback/utils/utils.go | 32 +++++--- 9 files changed, 193 insertions(+), 124 deletions(-) diff --git a/api/middleware/log_http.go b/api/middleware/log_http.go index 8d1512a..848e4c8 100644 --- a/api/middleware/log_http.go +++ b/api/middleware/log_http.go @@ -15,19 +15,29 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) -type LogHTTPConfig struct { - Enabled bool - MaxBody int64 - MaxLogSize int - OutputPath string - UseGzip bool - LogBefore bool -} - -type fileLogger struct { - *zap.Logger - logRoller *lumberjack.Logger -} +type ( + LogHTTPConfig struct { + Enabled bool + MaxBody int64 + MaxLogSize int + OutputPath string + UseGzip bool + } + fileLogger struct { + *zap.Logger + logRoller *lumberjack.Logger + } + // Implementation of zap.Sink for using lumberjack. + lumberjackSink struct { + *lumberjack.Logger + } + // responseReadWriter helps read http response body. + responseReadWriter struct { + http.ResponseWriter + response *bytes.Buffer + statusCode int + } +) const ( payloadLabel = "payload" @@ -57,6 +67,13 @@ func newFileLogger(conf *LogHTTPConfig) (fileLogger, error) { return flog, nil } +func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter { + return &responseReadWriter{ + ResponseWriter: w, + response: &bytes.Buffer{}, + } +} + // registerOutputSink creates and registers sink for logger file output. func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error { f.logRoller = &lumberjack.Logger{ @@ -76,11 +93,6 @@ func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error { return nil } -// Implementation of zap.Sink for using lumberjack. -type lumberjackSink struct { - *lumberjack.Logger -} - func (lumberjackSink) Sync() error { return nil } @@ -102,17 +114,16 @@ func ReloadFileLogger(conf *LogHTTPConfig) error { return nil } -// responseReadWriter helps read http response body. -type responseReadWriter struct { - http.ResponseWriter - response *bytes.Buffer -} - func (ww *responseReadWriter) Write(data []byte) (int, error) { ww.response.Write(data) return ww.ResponseWriter.Write(data) } +func (ww *responseReadWriter) WriteHeader(code int) { + ww.statusCode = code + ww.ResponseWriter.WriteHeader(code) +} + func (ww *responseReadWriter) Flush() { if f, ok := ww.ResponseWriter.(http.Flusher); ok { f.Flush() @@ -138,43 +149,35 @@ func LogHTTP(l *zap.Logger, config *LogHTTPConfig) Func { return } - httplog := filelog.getHTTPLogger(r) - httplog.withFieldIfExist("query", r.URL.Query()) - httplog.withFieldIfExist("headers", r.Header) + httplog := filelog.getHTTPLogger(r). + withFieldIfExist("query", r.URL.Query()). + withFieldIfExist("headers", r.Header) - payload, err := getBody(r.Body) - if err != nil { - l.Error(logs.FailedToGetHTTPBody, - zap.Error(err), - zap.String("body type", payloadLabel)) - return - } + payload := getBody(r.Body, l) r.Body = io.NopCloser(bytes.NewReader(payload)) - respBuf := &bytes.Buffer{} - wr := &responseReadWriter{ResponseWriter: w, response: respBuf} + payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody) + httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l) + + wr := newResponseReadWriter(w) h.ServeHTTP(wr, r) - payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody) - httplog, err = httplog.withProcessedBody(payloadLabel, payloadReader) - if err != nil { - l.Error(logs.FailedToProcessHTTPBody, - zap.Error(err), - zap.String("body type", payloadLabel)) - } - respReader := io.LimitReader(respBuf, config.MaxBody) - httplog, err = httplog.withProcessedBody(responseLabel, respReader) - if err != nil { - l.Error(logs.FailedToProcessHTTPBody, - zap.Error(err), - zap.String("body type", responseLabel)) - } + respReader := io.LimitReader(wr.response, config.MaxBody) + httplog = httplog.withProcessedBody(responseLabel, respReader, l) + httplog = httplog.with(zap.Int("status", wr.statusCode)) httplog.Info(logs.LogHTTP) }) } } +func (f *fileLogger) with(fields ...zap.Field) *fileLogger { + return &fileLogger{ + Logger: f.Logger.With(fields...), + logRoller: f.logRoller, + } +} + func (f *fileLogger) getHTTPLogger(r *http.Request) *fileLogger { return &fileLogger{ Logger: f.Logger.With( @@ -187,14 +190,16 @@ func (f *fileLogger) getHTTPLogger(r *http.Request) *fileLogger { } } -func (f *fileLogger) withProcessedBody(label string, bodyReader io.Reader) (*fileLogger, error) { - resp, err := ProcessBody(bodyReader) +func (f *fileLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *fileLogger { + resp, err := processBody(bodyReader) if err != nil { - return f, err + l.Error(logs.FailedToProcessHTTPBody, + zap.Error(err), + zap.String("body type", payloadLabel)) + return f } - f.Logger = f.Logger.With(zap.ByteString(label, resp)) - return f, nil + return f.with(zap.ByteString(label, resp)) } // newLoggerConfig creates new zap.Config with disabled base fields. @@ -210,29 +215,37 @@ func (f *fileLogger) newLoggerConfig() zap.Config { return c } -// getBody reads http.Body and returns its io.Reader. -func getBody(httpBody io.ReadCloser) ([]byte, error) { +func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte { defer httpBody.Close() body, err := io.ReadAll(httpBody) if err != nil { - return nil, err + l.Error(logs.FailedToGetHTTPBody, + zap.Error(err), + zap.String("body type", payloadLabel)) + return nil } - return body, nil + return body } -// ProcessBody reads body and base64 encode it if it's not XML. -func ProcessBody(bodyReader io.Reader) ([]byte, error) { +// processBody reads body and base64 encode it if it's not XML. +func processBody(bodyReader io.Reader) ([]byte, error) { resultBody := &bytes.Buffer{} - isXML, checkedPart, err := utils.DetectXML(bodyReader) + isXML, checkedBytes, err := utils.DetectXML(bodyReader) if err != nil { return nil, err } writer := utils.ChooseWriter(isXML, resultBody) - writer.Write(checkedPart) + _, err = writer.Write(checkedBytes) + if err != nil { + return nil, err + } if _, err = io.Copy(writer, bodyReader); err != nil { return nil, err } + if err = writer.Close(); err != nil { + return nil, err + } return resultBody.Bytes(), nil } @@ -240,7 +253,7 @@ func ProcessBody(bodyReader io.Reader) ([]byte, error) { // withFieldIfExist checks whether data is not empty and attach it to log output. func (f *fileLogger) withFieldIfExist(label string, data map[string][]string) *fileLogger { if len(data) != 0 { - f.Logger = f.Logger.With(zap.Any(label, data)) + return f.with(zap.Any(label, data)) } return f } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 07ea1cd..93e6dd2 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -597,7 +597,6 @@ func (s *appSettings) updateHTTPLoggingSettings(cfg *viper.Viper, log *zap.Logge s.httpLogging.MaxLogSize = cfg.GetInt(cfgHTTPLoggingMaxLogSize) s.httpLogging.OutputPath = cfg.GetString(cfgHTTPLoggingDestination) s.httpLogging.UseGzip = cfg.GetBool(cfgHTTPLoggingGzip) - s.httpLogging.LogBefore = cfg.GetBool(cfgHTTPLoggingLogResponse) if err := s3middleware.ReloadFileLogger(s.httpLogging); err != nil { log.Error(logs.FailedToReloadHTTPFileLogger, zap.Error(err)) } diff --git a/cmd/s3-playback/modules/root.go b/cmd/s3-playback/modules/root.go index ab8d6b2..f1d6f05 100644 --- a/cmd/s3-playback/modules/root.go +++ b/cmd/s3-playback/modules/root.go @@ -10,12 +10,14 @@ import ( ) const ( - configPath = "config" - configPathFlag = "config" - httpTimeout = "http_timeout" - httpTimeoutFlag = "http-timeout" - skipVerifyTLS = "skip_verify_tls" - skipVerifyTLSFlag = "skip-verify-tls" + defaultPrintResponseLimit = 1024 + configPath = "config" + configPathFlag = "config" + httpTimeout = "http_timeout" + httpTimeoutFlag = "http-timeout" + skipVerifyTLS = "skip_verify_tls" + skipVerifyTLSFlag = "skip-verify-tls" + printResponseLimit = "print_response_limit" ) var ( @@ -36,6 +38,8 @@ var ( _ = viper.BindPFlag(skipVerifyTLS, cmd.PersistentFlags().Lookup(skipVerifyTLSFlag)) + viper.SetDefault(printResponseLimit, defaultPrintResponseLimit) + return nil }, RunE: func(cmd *cobra.Command, _ []string) error { diff --git a/cmd/s3-playback/modules/run.go b/cmd/s3-playback/modules/run.go index 53e9bed..57dcc3d 100644 --- a/cmd/s3-playback/modules/run.go +++ b/cmd/s3-playback/modules/run.go @@ -18,7 +18,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/request" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -55,26 +56,44 @@ func init() { runCmd.Flags().String(endpointFlag, "", "endpoint URL") } -func logResponse(cmd *cobra.Command, id int, resp *http.Response, logReq request.LoggedRequest) { - cmd.Println(strconv.Itoa(id)+")", logReq.Method, logReq.URI) +func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) { + cmd.Println(r.Method, r.URL.RequestURI()) cmd.Println(resp.Status) - if resp.ContentLength != 0 { - body, err := io.ReadAll(io.LimitReader(resp.Body, resp.ContentLength)) - if err != nil { - cmd.Println(id, err) - } - cmd.Println(string(body)) + if resp.ContentLength == 0 { + return } + isXML, checkBuf, err := utils.DetectXML(resp.Body) + if err != nil { + cmd.Println(err.Error()) + return + } + body := &bytes.Buffer{} + resultWriter := utils.ChooseWriter(isXML, body) + if _, err = resultWriter.Write(checkBuf); err != nil { + cmd.Println(err) + return + } + _, err = io.Copy(resultWriter, io.LimitReader(resp.Body, viper.GetInt64(printResponseLimit))) + if err != nil { + cmd.Println(err) + return + } + if err = resultWriter.Close(); err != nil { + cmd.Printf("could not close response body: %s\n", err) + return + } + + cmd.Println(body.String()) cmd.Println() } func run(cmd *cobra.Command, _ []string) error { - ctx := request.SetCredentials( + ctx := playback.SetCredentials( cmd.Context(), viper.GetString(awsAccessKey), viper.GetString(awsSecretKey), ) - ctx = request.WithMultiparts(ctx) + ctx = playback.WithMultiparts(ctx) file, err := os.Open(viper.GetString(logPathFlag)) if err != nil { @@ -109,13 +128,14 @@ func run(cmd *cobra.Command, _ []string) error { case <-ctx.Done(): return fmt.Errorf("interrupted: %w", ctx.Err()) default: - resp, err := playback(ctx, logReq, client) + r, resp, err := playbackRequest(ctx, logReq, client) if err != nil { cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err) id++ continue } - logResponse(cmd, id, resp, logReq) + cmd.Print(strconv.Itoa(id) + ") ") + logResponse(cmd, r, resp) id++ } } @@ -123,8 +143,8 @@ func run(cmd *cobra.Command, _ []string) error { return nil } -func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) { - var logReq request.LoggedRequest +func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) { + var logReq playback.LoggedRequest req, err := reader.ReadString('\n') if err != nil { return logReq, err @@ -138,55 +158,55 @@ func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) { return logReq, nil } -// playback creates http.Request from LoggedRequest and sends it to specified endpoint. -func playback(ctx context.Context, logReq request.LoggedRequest, client *http.Client) (*http.Response, error) { +// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint. +func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, client *http.Client) (*http.Request, *http.Response, error) { r, err := prepareRequest(ctx, logReq) if err != nil { - return nil, fmt.Errorf("failed to prepare request: %w", err) + return nil, nil, fmt.Errorf("failed to prepare request: %w", err) } resp, err := client.Do(r) if err != nil { - return nil, fmt.Errorf("failed to send request: %w", err) + return nil, nil, fmt.Errorf("failed to send request: %w", err) } respBody, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) + return nil, nil, fmt.Errorf("failed to read response body: %w", err) } defer resp.Body.Close() - if err = request.HandleResponse(ctx, r, respBody, logReq.Response); err != nil { - return nil, fmt.Errorf("failed to register multipart upload: %w", err) + if err = playback.HandleResponse(ctx, r, respBody, logReq.Response); err != nil { + return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err) } resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) - return resp, nil + return r, resp, nil } // prepareRequest creates request from logs and modifies its signature and uploadId (if presents). -func prepareRequest(ctx context.Context, logReq request.LoggedRequest) (*http.Request, error) { +func prepareRequest(ctx context.Context, logReq playback.LoggedRequest) (*http.Request, error) { r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI, - bytes.NewReader(logReq.Body)) + bytes.NewReader(logReq.Payload)) if err != nil { return nil, err } r.Header = logReq.Header - err = request.SwapUploadID(ctx, r) + err = playback.SwapUploadID(ctx, r) if err != nil { return nil, err } sha256hash := sha256.New() - sha256hash.Write(logReq.Body) + sha256hash.Write(logReq.Payload) r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil))) if r.Header.Get(api.ContentMD5) != "" { sha256hash.Reset() md5hash := md5.New() - md5hash.Write(logReq.Body) + md5hash.Write(logReq.Payload) r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil))) } - err = request.Sign(ctx, r) + err = playback.Sign(ctx, r) if err != nil { return nil, err } diff --git a/config/playback/playback.yaml b/config/playback/playback.yaml index b7d5b63..27ed4a4 100644 --- a/config/playback/playback.yaml +++ b/config/playback/playback.yaml @@ -1,7 +1,8 @@ endpoint: http://localhost:8084 -log: ./log/multipart5.log +log: ./log/request.log credentials: access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30 http_timeout: 60s skip_verify_tls: false +print_response_limit: 1024 diff --git a/docs/configuration.md b/docs/configuration.md index ffb193a..54e442d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,7 +392,6 @@ http_logging: max_log_size: 20 gzip: true destination: stdout - log_response: true ``` | Parameter | Type | SIGHUP reload | Default value | Description | @@ -402,7 +401,6 @@ http_logging: | `max_log_size` | int | yes | 50 | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. | | `gzip` | bool | yes | false | Whether to enable Gzip compression to backup log files. | | `destination` | string | yes | stdout | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. | -| `log_response` | bool | yes | true | Whether to attach response body to the log. | ### `cache` section diff --git a/docs/playback.md b/docs/playback.md index 31d2d7d..0f62c7f 100644 --- a/docs/playback.md +++ b/docs/playback.md @@ -31,14 +31,15 @@ If corresponding flag is set, it overrides parameter from config. ### Configuration parameters -#### Global params -| # | Config parameter name | Flag name | Type | Default value | Description | -|---|-----------------------|-----------------|----------|---------------|--------------------------------------------------------| -| 1 | - | config | string | - | config file path (e.g. `./config/playback.yaml`) | -| 2 | http_timeout | http-timeout | duration | 60s | http request timeout | -| 3 | skip_verify_tls | skip-verify-tls | bool | false | skips tls certificate verification for https endpoints | -| 4 | credentials.accessKey | - | string | - | AWS access key id | -| 5 | credentials.secretKey | - | string | - | AWS secret key | +#### Global parameters +| # | Config parameter name | Flag name | Type | Default value | Description | +|---|-----------------------|-----------------|----------|---------------|-------------------------------------------------------------------------------| +| 1 | - | config | string | - | config file path (e.g. `./config/playback.yaml`) | +| 2 | http_timeout | http-timeout | duration | 60s | http request timeout | +| 3 | skip_verify_tls | skip-verify-tls | bool | false | skips tls certificate verification for https endpoints | +| 4 | credentials.accessKey | - | string | - | AWS access key id | +| 5 | credentials.secretKey | - | string | - | AWS secret key | +| 6 | print_response_limit | - | int | 1024 | max response length to be printed in stdout, the rest of body will be omitted | #### `run` command parameters | # | Config parameter name | Flag name | Type | Default value | Description | diff --git a/playback/request.go b/playback/request.go index d989632..ac9e577 100644 --- a/playback/request.go +++ b/playback/request.go @@ -3,14 +3,18 @@ package playback import ( "context" "errors" + "fmt" + "io" "net/http" "net/url" "regexp" + "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/credentials" ) @@ -22,14 +26,15 @@ var ( ) type ( + httpBody []byte LoggedRequest struct { From string `json:"from"` URI string `json:"URI"` Method string `json:"method"` + Payload httpBody `json:"payload,omitempty"` + Response httpBody `json:"response,omitempty"` Query url.Values `json:"query"` - Body []byte `json:"body"` Header http.Header `json:"headers"` - Response []byte `json:"response"` } Credentials struct { AccessKey string @@ -39,6 +44,24 @@ type ( multipartKey struct{} ) +func (h *httpBody) UnmarshalJSON(data []byte) error { + unquoted, err := strconv.Unquote(string(data)) + if err != nil { + return fmt.Errorf("failed to unquote data: %w", err) + } + isXML, _, err := utils.DetectXML(strings.NewReader(unquoted)) + if err != nil { + return fmt.Errorf("failed to detect httpbody type: %w", err) + } + reader := utils.ChooseReader(isXML, strings.NewReader(unquoted)) + *h, err = io.ReadAll(reader) + if err != nil { + return fmt.Errorf("failed to unmarshal httpbody: %w", err) + } + + return nil +} + func SetCredentials(ctx context.Context, accessKey, secretKey string) context.Context { return context.WithValue(ctx, contextKey{}, Credentials{ diff --git a/playback/utils/utils.go b/playback/utils/utils.go index 09fbfb5..08347d3 100644 --- a/playback/utils/utils.go +++ b/playback/utils/utils.go @@ -8,35 +8,45 @@ import ( "io" ) +type nopCloseWriter struct { + io.Writer +} + +func (b nopCloseWriter) Close() error { + return nil +} + const BodyRecognizeLimit int64 = 128 func DetectXML(reader io.Reader) (bool, []byte, error) { - checkBuf := bytes.NewBuffer(nil) - token, err := xml.NewDecoder(io.TeeReader(io.LimitReader(reader, BodyRecognizeLimit), checkBuf)).RawToken() + detectBuf := bytes.NewBuffer(nil) + detectReader := io.TeeReader(io.LimitReader(reader, BodyRecognizeLimit), detectBuf) + token, err := xml.NewDecoder(detectReader).RawToken() if err != nil { var xmlErr *xml.SyntaxError if errors.Is(err, io.EOF) || errors.As(err, &xmlErr) { - return false, checkBuf.Bytes(), nil + return false, detectBuf.Bytes(), nil } - return false, checkBuf.Bytes(), err + return false, detectBuf.Bytes(), err } switch token.(type) { case xml.StartElement, xml.ProcInst: - return true, checkBuf.Bytes(), nil + return true, detectBuf.Bytes(), nil } - return false, checkBuf.Bytes(), nil + return false, detectBuf.Bytes(), nil } -func ChooseWriter(isXML bool, body *bytes.Buffer) io.Writer { +func ChooseWriter(isXML bool, bodyWriter io.Writer) io.WriteCloser { + writeCloser := nopCloseWriter{bodyWriter} if !isXML { - return base64.NewEncoder(base64.StdEncoding, body) + return base64.NewEncoder(base64.StdEncoding, bodyWriter) } - return body + return writeCloser } -func ChooseReader(isXml bool, bodyReader io.Reader) io.Reader { - if !isXml { +func ChooseReader(isXML bool, bodyReader io.Reader) io.Reader { + if !isXML { return base64.NewDecoder(base64.StdEncoding, bodyReader) } return bodyReader