frostfs-s3-gw/cmd/s3-playback/modules/run.go
Nikita Zinkevich 62615d7ab7 [#369] Request reproducer
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-09-11 15:25:09 +03:00

208 lines
5.7 KiB
Go

package modules
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/internal/playback"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
const (
cfgPrintResponseLimit = "print-response-limit"
cfgLogPath = "log"
cfgEndpoint = "endpoint"
awsAccessKey = "credentials.access_key"
awsSecretKey = "credentials.secret_key"
)
var runCmd = &cobra.Command{
Use: "run",
Short: "Send requests from log file",
Long: "Reads the network log file and sends each request to the specified URL",
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) (err error) {
viper.SetDefault(cfgPrintResponseLimit, defaultPrintResponseLimit)
return viper.BindPFlags(cmd.Flags())
},
RunE: run,
}
func initRunCmd() {
runCmd.Flags().String(cfgLogPath, "./request.log", "log file path")
runCmd.Flags().String(cfgEndpoint, "", "endpoint URL")
runCmd.Flags().Int(cfgPrintResponseLimit, defaultPrintResponseLimit, "print limit for http response body")
}
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
cmd.Println(r.Method, r.URL.RequestURI())
cmd.Println(resp.Status)
if resp.ContentLength == 0 {
return
}
detect := detector.NewDetector(resp.Body, xmlutils.DetectXML)
dataType, err := detect.Detect()
if err != nil {
cmd.PrintErrln("type detection error:", err.Error())
return
}
body := &bytes.Buffer{}
resultWriter := xmlutils.ChooseWriter(dataType, body)
_, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(cfgPrintResponseLimit)))
if err != nil {
cmd.Println(err)
return
}
if err = resultWriter.Close(); err != nil {
cmd.Printf("could not close response body: %s\n", err)
return
}
cmd.Println(body.String())
cmd.Println()
}
func run(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
settings := &playback.Settings{
Endpoint: viper.GetString(cfgEndpoint),
Creds: playback.Credentials{
AccessKey: viper.GetString(awsAccessKey),
SecretKey: viper.GetString(awsSecretKey),
},
Multiparts: make(map[string]playback.MultipartUpload),
Client: &http.Client{
Transport: http.DefaultTransport,
Timeout: viper.GetDuration(cfgHTTPTimeoutFlag),
},
}
file, err := os.Open(viper.GetString(cfgLogPath))
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
if viper.GetBool(cfgSkipVerifyTLS) {
settings.Client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
id := 1
for {
logReq, err := getRequestFromLog(reader)
if err != nil {
if err == io.EOF {
break
}
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
id++
continue
}
select {
case <-ctx.Done():
return fmt.Errorf("interrupted: %w", ctx.Err())
default:
r, resp, err := playbackRequest(ctx, logReq, settings)
if err != nil {
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
id++
continue
}
cmd.Print(strconv.Itoa(id) + ") ")
logResponse(cmd, r, resp)
id++
}
}
return nil
}
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
var logReq playback.LoggedRequest
req, err := reader.ReadString('\n')
if err != nil {
return logReq, err
}
err = json.Unmarshal([]byte(req), &logReq)
if err != nil {
return logReq, err
}
return logReq, nil
}
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, *http.Response, error) {
r, err := prepareRequest(ctx, logReq, settings)
if err != nil {
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
}
resp, err := settings.Client.Do(r)
if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %w", err)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
}
defer resp.Body.Close()
if err = playback.HandleResponse(r, settings.Multiparts, respBody, logReq.Response); err != nil {
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
}
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
return r, resp, nil
}
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, error) {
r, err := http.NewRequestWithContext(ctx, logReq.Method, settings.Endpoint+logReq.URI, bytes.NewReader(logReq.Payload))
if err != nil {
return nil, err
}
r.Header = logReq.Header
sha256hash := sha256.New()
sha256hash.Write(logReq.Payload)
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
if r.Header.Get(api.ContentMD5) != "" {
sha256hash.Reset()
md5hash := md5.New()
md5hash.Write(logReq.Payload)
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
}
if r.URL.Query().Has("uploadId") {
if err = playback.SwapUploadID(r, settings); err != nil {
return nil, err
}
}
if r.Header.Get(auth.AuthorizationHdr) != "" {
if err = playback.Sign(ctx, r, settings.Creds); err != nil {
return nil, err
}
}
return r, nil
}