[#369] S3-playback http-body decoding
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
213eb1662f
commit
490a9ed95e
9 changed files with 193 additions and 124 deletions
|
@ -15,19 +15,29 @@ import (
|
|||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
type LogHTTPConfig struct {
|
||||
type (
|
||||
LogHTTPConfig struct {
|
||||
Enabled bool
|
||||
MaxBody int64
|
||||
MaxLogSize int
|
||||
OutputPath string
|
||||
UseGzip bool
|
||||
LogBefore bool
|
||||
}
|
||||
|
||||
type fileLogger struct {
|
||||
}
|
||||
fileLogger struct {
|
||||
*zap.Logger
|
||||
logRoller *lumberjack.Logger
|
||||
}
|
||||
}
|
||||
// Implementation of zap.Sink for using lumberjack.
|
||||
lumberjackSink struct {
|
||||
*lumberjack.Logger
|
||||
}
|
||||
// responseReadWriter helps read http response body.
|
||||
responseReadWriter struct {
|
||||
http.ResponseWriter
|
||||
response *bytes.Buffer
|
||||
statusCode int
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
payloadLabel = "payload"
|
||||
|
@ -57,6 +67,13 @@ func newFileLogger(conf *LogHTTPConfig) (fileLogger, error) {
|
|||
return flog, nil
|
||||
}
|
||||
|
||||
func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter {
|
||||
return &responseReadWriter{
|
||||
ResponseWriter: w,
|
||||
response: &bytes.Buffer{},
|
||||
}
|
||||
}
|
||||
|
||||
// registerOutputSink creates and registers sink for logger file output.
|
||||
func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error {
|
||||
f.logRoller = &lumberjack.Logger{
|
||||
|
@ -76,11 +93,6 @@ func (f *fileLogger) registerOutputSink(conf *LogHTTPConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Implementation of zap.Sink for using lumberjack.
|
||||
type lumberjackSink struct {
|
||||
*lumberjack.Logger
|
||||
}
|
||||
|
||||
func (lumberjackSink) Sync() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -102,17 +114,16 @@ func ReloadFileLogger(conf *LogHTTPConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// responseReadWriter helps read http response body.
|
||||
type responseReadWriter struct {
|
||||
http.ResponseWriter
|
||||
response *bytes.Buffer
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Write(data []byte) (int, error) {
|
||||
ww.response.Write(data)
|
||||
return ww.ResponseWriter.Write(data)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) WriteHeader(code int) {
|
||||
ww.statusCode = code
|
||||
ww.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Flush() {
|
||||
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
|
@ -138,43 +149,35 @@ func LogHTTP(l *zap.Logger, config *LogHTTPConfig) Func {
|
|||
return
|
||||
}
|
||||
|
||||
httplog := filelog.getHTTPLogger(r)
|
||||
httplog.withFieldIfExist("query", r.URL.Query())
|
||||
httplog.withFieldIfExist("headers", r.Header)
|
||||
httplog := filelog.getHTTPLogger(r).
|
||||
withFieldIfExist("query", r.URL.Query()).
|
||||
withFieldIfExist("headers", r.Header)
|
||||
|
||||
payload, err := getBody(r.Body)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToGetHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return
|
||||
}
|
||||
payload := getBody(r.Body, l)
|
||||
r.Body = io.NopCloser(bytes.NewReader(payload))
|
||||
|
||||
respBuf := &bytes.Buffer{}
|
||||
wr := &responseReadWriter{ResponseWriter: w, response: respBuf}
|
||||
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l)
|
||||
|
||||
wr := newResponseReadWriter(w)
|
||||
h.ServeHTTP(wr, r)
|
||||
|
||||
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
|
||||
httplog, err = httplog.withProcessedBody(payloadLabel, payloadReader)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToProcessHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
}
|
||||
respReader := io.LimitReader(respBuf, config.MaxBody)
|
||||
httplog, err = httplog.withProcessedBody(responseLabel, respReader)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToProcessHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", responseLabel))
|
||||
}
|
||||
respReader := io.LimitReader(wr.response, config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(responseLabel, respReader, l)
|
||||
httplog = httplog.with(zap.Int("status", wr.statusCode))
|
||||
|
||||
httplog.Info(logs.LogHTTP)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fileLogger) with(fields ...zap.Field) *fileLogger {
|
||||
return &fileLogger{
|
||||
Logger: f.Logger.With(fields...),
|
||||
logRoller: f.logRoller,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fileLogger) getHTTPLogger(r *http.Request) *fileLogger {
|
||||
return &fileLogger{
|
||||
Logger: f.Logger.With(
|
||||
|
@ -187,14 +190,16 @@ func (f *fileLogger) getHTTPLogger(r *http.Request) *fileLogger {
|
|||
}
|
||||
}
|
||||
|
||||
func (f *fileLogger) withProcessedBody(label string, bodyReader io.Reader) (*fileLogger, error) {
|
||||
resp, err := ProcessBody(bodyReader)
|
||||
func (f *fileLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *fileLogger {
|
||||
resp, err := processBody(bodyReader)
|
||||
if err != nil {
|
||||
return f, err
|
||||
l.Error(logs.FailedToProcessHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return f
|
||||
}
|
||||
f.Logger = f.Logger.With(zap.ByteString(label, resp))
|
||||
|
||||
return f, nil
|
||||
return f.with(zap.ByteString(label, resp))
|
||||
}
|
||||
|
||||
// newLoggerConfig creates new zap.Config with disabled base fields.
|
||||
|
@ -210,29 +215,37 @@ func (f *fileLogger) newLoggerConfig() zap.Config {
|
|||
return c
|
||||
}
|
||||
|
||||
// getBody reads http.Body and returns its io.Reader.
|
||||
func getBody(httpBody io.ReadCloser) ([]byte, error) {
|
||||
func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte {
|
||||
defer httpBody.Close()
|
||||
|
||||
body, err := io.ReadAll(httpBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
l.Error(logs.FailedToGetHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return nil
|
||||
}
|
||||
return body, nil
|
||||
return body
|
||||
}
|
||||
|
||||
// ProcessBody reads body and base64 encode it if it's not XML.
|
||||
func ProcessBody(bodyReader io.Reader) ([]byte, error) {
|
||||
// processBody reads body and base64 encode it if it's not XML.
|
||||
func processBody(bodyReader io.Reader) ([]byte, error) {
|
||||
resultBody := &bytes.Buffer{}
|
||||
isXML, checkedPart, err := utils.DetectXML(bodyReader)
|
||||
isXML, checkedBytes, err := utils.DetectXML(bodyReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := utils.ChooseWriter(isXML, resultBody)
|
||||
writer.Write(checkedPart)
|
||||
_, err = writer.Write(checkedBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = io.Copy(writer, bodyReader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resultBody.Bytes(), nil
|
||||
}
|
||||
|
@ -240,7 +253,7 @@ func ProcessBody(bodyReader io.Reader) ([]byte, error) {
|
|||
// withFieldIfExist checks whether data is not empty and attach it to log output.
|
||||
func (f *fileLogger) withFieldIfExist(label string, data map[string][]string) *fileLogger {
|
||||
if len(data) != 0 {
|
||||
f.Logger = f.Logger.With(zap.Any(label, data))
|
||||
return f.with(zap.Any(label, data))
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
|
|
@ -597,7 +597,6 @@ func (s *appSettings) updateHTTPLoggingSettings(cfg *viper.Viper, log *zap.Logge
|
|||
s.httpLogging.MaxLogSize = cfg.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||
s.httpLogging.OutputPath = cfg.GetString(cfgHTTPLoggingDestination)
|
||||
s.httpLogging.UseGzip = cfg.GetBool(cfgHTTPLoggingGzip)
|
||||
s.httpLogging.LogBefore = cfg.GetBool(cfgHTTPLoggingLogResponse)
|
||||
if err := s3middleware.ReloadFileLogger(s.httpLogging); err != nil {
|
||||
log.Error(logs.FailedToReloadHTTPFileLogger, zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -10,12 +10,14 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultPrintResponseLimit = 1024
|
||||
configPath = "config"
|
||||
configPathFlag = "config"
|
||||
httpTimeout = "http_timeout"
|
||||
httpTimeoutFlag = "http-timeout"
|
||||
skipVerifyTLS = "skip_verify_tls"
|
||||
skipVerifyTLSFlag = "skip-verify-tls"
|
||||
printResponseLimit = "print_response_limit"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -36,6 +38,8 @@ var (
|
|||
|
||||
_ = viper.BindPFlag(skipVerifyTLS, cmd.PersistentFlags().Lookup(skipVerifyTLSFlag))
|
||||
|
||||
viper.SetDefault(printResponseLimit, defaultPrintResponseLimit)
|
||||
|
||||
return nil
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
|
|
|
@ -18,7 +18,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/request"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
@ -55,26 +56,44 @@ func init() {
|
|||
runCmd.Flags().String(endpointFlag, "", "endpoint URL")
|
||||
}
|
||||
|
||||
func logResponse(cmd *cobra.Command, id int, resp *http.Response, logReq request.LoggedRequest) {
|
||||
cmd.Println(strconv.Itoa(id)+")", logReq.Method, logReq.URI)
|
||||
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
|
||||
cmd.Println(r.Method, r.URL.RequestURI())
|
||||
cmd.Println(resp.Status)
|
||||
if resp.ContentLength != 0 {
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, resp.ContentLength))
|
||||
if resp.ContentLength == 0 {
|
||||
return
|
||||
}
|
||||
isXML, checkBuf, err := utils.DetectXML(resp.Body)
|
||||
if err != nil {
|
||||
cmd.Println(id, err)
|
||||
cmd.Println(err.Error())
|
||||
return
|
||||
}
|
||||
cmd.Println(string(body))
|
||||
body := &bytes.Buffer{}
|
||||
resultWriter := utils.ChooseWriter(isXML, body)
|
||||
if _, err = resultWriter.Write(checkBuf); err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
}
|
||||
_, err = io.Copy(resultWriter, io.LimitReader(resp.Body, viper.GetInt64(printResponseLimit)))
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
}
|
||||
if err = resultWriter.Close(); err != nil {
|
||||
cmd.Printf("could not close response body: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
cmd.Println(body.String())
|
||||
cmd.Println()
|
||||
}
|
||||
|
||||
func run(cmd *cobra.Command, _ []string) error {
|
||||
ctx := request.SetCredentials(
|
||||
ctx := playback.SetCredentials(
|
||||
cmd.Context(),
|
||||
viper.GetString(awsAccessKey),
|
||||
viper.GetString(awsSecretKey),
|
||||
)
|
||||
ctx = request.WithMultiparts(ctx)
|
||||
ctx = playback.WithMultiparts(ctx)
|
||||
|
||||
file, err := os.Open(viper.GetString(logPathFlag))
|
||||
if err != nil {
|
||||
|
@ -109,13 +128,14 @@ func run(cmd *cobra.Command, _ []string) error {
|
|||
case <-ctx.Done():
|
||||
return fmt.Errorf("interrupted: %w", ctx.Err())
|
||||
default:
|
||||
resp, err := playback(ctx, logReq, client)
|
||||
r, resp, err := playbackRequest(ctx, logReq, client)
|
||||
if err != nil {
|
||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
||||
id++
|
||||
continue
|
||||
}
|
||||
logResponse(cmd, id, resp, logReq)
|
||||
cmd.Print(strconv.Itoa(id) + ") ")
|
||||
logResponse(cmd, r, resp)
|
||||
id++
|
||||
}
|
||||
}
|
||||
|
@ -123,8 +143,8 @@ func run(cmd *cobra.Command, _ []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
|
||||
var logReq request.LoggedRequest
|
||||
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
|
||||
var logReq playback.LoggedRequest
|
||||
req, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return logReq, err
|
||||
|
@ -138,55 +158,55 @@ func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
|
|||
return logReq, nil
|
||||
}
|
||||
|
||||
// playback creates http.Request from LoggedRequest and sends it to specified endpoint.
|
||||
func playback(ctx context.Context, logReq request.LoggedRequest, client *http.Client) (*http.Response, error) {
|
||||
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
|
||||
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, client *http.Client) (*http.Request, *http.Response, error) {
|
||||
r, err := prepareRequest(ctx, logReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to prepare request: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
|
||||
}
|
||||
resp, err := client.Do(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to send request: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to send request: %w", err)
|
||||
}
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err = request.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
|
||||
return nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
||||
if err = playback.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
||||
}
|
||||
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
||||
|
||||
return resp, nil
|
||||
return r, resp, nil
|
||||
}
|
||||
|
||||
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
||||
func prepareRequest(ctx context.Context, logReq request.LoggedRequest) (*http.Request, error) {
|
||||
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest) (*http.Request, error) {
|
||||
r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI,
|
||||
bytes.NewReader(logReq.Body))
|
||||
bytes.NewReader(logReq.Payload))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Header = logReq.Header
|
||||
|
||||
err = request.SwapUploadID(ctx, r)
|
||||
err = playback.SwapUploadID(ctx, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sha256hash := sha256.New()
|
||||
sha256hash.Write(logReq.Body)
|
||||
sha256hash.Write(logReq.Payload)
|
||||
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
||||
if r.Header.Get(api.ContentMD5) != "" {
|
||||
sha256hash.Reset()
|
||||
md5hash := md5.New()
|
||||
md5hash.Write(logReq.Body)
|
||||
md5hash.Write(logReq.Payload)
|
||||
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
||||
}
|
||||
err = request.Sign(ctx, r)
|
||||
err = playback.Sign(ctx, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
endpoint: http://localhost:8084
|
||||
log: ./log/multipart5.log
|
||||
log: ./log/request.log
|
||||
credentials:
|
||||
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
|
||||
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
|
||||
http_timeout: 60s
|
||||
skip_verify_tls: false
|
||||
print_response_limit: 1024
|
||||
|
|
|
@ -392,7 +392,6 @@ http_logging:
|
|||
max_log_size: 20
|
||||
gzip: true
|
||||
destination: stdout
|
||||
log_response: true
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|
@ -402,7 +401,6 @@ http_logging:
|
|||
| `max_log_size` | int | yes | 50 | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. |
|
||||
| `gzip` | bool | yes | false | Whether to enable Gzip compression to backup log files. |
|
||||
| `destination` | string | yes | stdout | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. |
|
||||
| `log_response` | bool | yes | true | Whether to attach response body to the log. |
|
||||
|
||||
|
||||
### `cache` section
|
||||
|
|
|
@ -31,14 +31,15 @@ If corresponding flag is set, it overrides parameter from config.
|
|||
|
||||
### Configuration parameters
|
||||
|
||||
#### Global params
|
||||
#### Global parameters
|
||||
| # | Config parameter name | Flag name | Type | Default value | Description |
|
||||
|---|-----------------------|-----------------|----------|---------------|--------------------------------------------------------|
|
||||
|---|-----------------------|-----------------|----------|---------------|-------------------------------------------------------------------------------|
|
||||
| 1 | - | config | string | - | config file path (e.g. `./config/playback.yaml`) |
|
||||
| 2 | http_timeout | http-timeout | duration | 60s | http request timeout |
|
||||
| 3 | skip_verify_tls | skip-verify-tls | bool | false | skips tls certificate verification for https endpoints |
|
||||
| 4 | credentials.accessKey | - | string | - | AWS access key id |
|
||||
| 5 | credentials.secretKey | - | string | - | AWS secret key |
|
||||
| 6 | print_response_limit | - | int | 1024 | max response length to be printed in stdout, the rest of body will be omitted |
|
||||
|
||||
#### `run` command parameters
|
||||
| # | Config parameter name | Flag name | Type | Default value | Description |
|
||||
|
|
|
@ -3,14 +3,18 @@ package playback
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/playback/utils"
|
||||
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
)
|
||||
|
@ -22,14 +26,15 @@ var (
|
|||
)
|
||||
|
||||
type (
|
||||
httpBody []byte
|
||||
LoggedRequest struct {
|
||||
From string `json:"from"`
|
||||
URI string `json:"URI"`
|
||||
Method string `json:"method"`
|
||||
Payload httpBody `json:"payload,omitempty"`
|
||||
Response httpBody `json:"response,omitempty"`
|
||||
Query url.Values `json:"query"`
|
||||
Body []byte `json:"body"`
|
||||
Header http.Header `json:"headers"`
|
||||
Response []byte `json:"response"`
|
||||
}
|
||||
Credentials struct {
|
||||
AccessKey string
|
||||
|
@ -39,6 +44,24 @@ type (
|
|||
multipartKey struct{}
|
||||
)
|
||||
|
||||
func (h *httpBody) UnmarshalJSON(data []byte) error {
|
||||
unquoted, err := strconv.Unquote(string(data))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unquote data: %w", err)
|
||||
}
|
||||
isXML, _, err := utils.DetectXML(strings.NewReader(unquoted))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to detect httpbody type: %w", err)
|
||||
}
|
||||
reader := utils.ChooseReader(isXML, strings.NewReader(unquoted))
|
||||
*h, err = io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal httpbody: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetCredentials(ctx context.Context, accessKey, secretKey string) context.Context {
|
||||
return context.WithValue(ctx, contextKey{},
|
||||
Credentials{
|
||||
|
|
|
@ -8,35 +8,45 @@ import (
|
|||
"io"
|
||||
)
|
||||
|
||||
type nopCloseWriter struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (b nopCloseWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const BodyRecognizeLimit int64 = 128
|
||||
|
||||
func DetectXML(reader io.Reader) (bool, []byte, error) {
|
||||
checkBuf := bytes.NewBuffer(nil)
|
||||
token, err := xml.NewDecoder(io.TeeReader(io.LimitReader(reader, BodyRecognizeLimit), checkBuf)).RawToken()
|
||||
detectBuf := bytes.NewBuffer(nil)
|
||||
detectReader := io.TeeReader(io.LimitReader(reader, BodyRecognizeLimit), detectBuf)
|
||||
token, err := xml.NewDecoder(detectReader).RawToken()
|
||||
if err != nil {
|
||||
var xmlErr *xml.SyntaxError
|
||||
if errors.Is(err, io.EOF) || errors.As(err, &xmlErr) {
|
||||
return false, checkBuf.Bytes(), nil
|
||||
return false, detectBuf.Bytes(), nil
|
||||
}
|
||||
return false, checkBuf.Bytes(), err
|
||||
return false, detectBuf.Bytes(), err
|
||||
}
|
||||
|
||||
switch token.(type) {
|
||||
case xml.StartElement, xml.ProcInst:
|
||||
return true, checkBuf.Bytes(), nil
|
||||
return true, detectBuf.Bytes(), nil
|
||||
}
|
||||
return false, checkBuf.Bytes(), nil
|
||||
return false, detectBuf.Bytes(), nil
|
||||
}
|
||||
|
||||
func ChooseWriter(isXML bool, body *bytes.Buffer) io.Writer {
|
||||
func ChooseWriter(isXML bool, bodyWriter io.Writer) io.WriteCloser {
|
||||
writeCloser := nopCloseWriter{bodyWriter}
|
||||
if !isXML {
|
||||
return base64.NewEncoder(base64.StdEncoding, body)
|
||||
return base64.NewEncoder(base64.StdEncoding, bodyWriter)
|
||||
}
|
||||
return body
|
||||
return writeCloser
|
||||
}
|
||||
|
||||
func ChooseReader(isXml bool, bodyReader io.Reader) io.Reader {
|
||||
if !isXml {
|
||||
func ChooseReader(isXML bool, bodyReader io.Reader) io.Reader {
|
||||
if !isXML {
|
||||
return base64.NewDecoder(base64.StdEncoding, bodyReader)
|
||||
}
|
||||
return bodyReader
|
||||
|
|
Loading…
Reference in a new issue