frostfs-s3-gw/cmd/s3-playback/modules/run.go
Nikita Zinkevich e2e80152b5 [#369] Request reproducer
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-08-01 14:28:07 +03:00

194 lines
4.8 KiB
Go

package modules
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/request"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
const (
logPathFlag = "log"
endpointFlag = "endpoint"
awsAccessKey = "credentials.access_key"
awsSecretKey = "credentials.secret_key"
)
var runCmd = &cobra.Command{
Use: "run",
Short: "Send requests from log file",
Long: "Reads the network log file and sends each request to the specified URL",
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if rootCmd.PersistentPreRunE != nil {
if err := rootCmd.PersistentPreRunE(cmd, args); err != nil {
return err
}
}
_ = viper.BindPFlag(logPathFlag, cmd.Flags().Lookup(logPathFlag))
_ = viper.BindPFlag(endpointFlag, cmd.Flags().Lookup(endpointFlag))
return nil
},
RunE: run,
}
func init() {
runCmd.Flags().String(logPathFlag, "./request.log", "log file path")
runCmd.Flags().String(endpointFlag, "", "endpoint URL")
}
func logResponse(cmd *cobra.Command, id int, resp *http.Response, logReq request.LoggedRequest) {
cmd.Println(strconv.Itoa(id)+")", logReq.Method, logReq.URI)
cmd.Println(resp.Status)
if resp.ContentLength != 0 {
body, err := io.ReadAll(io.LimitReader(resp.Body, resp.ContentLength))
if err != nil {
cmd.Println(id, err)
}
cmd.Println(string(body))
}
cmd.Println()
}
func run(cmd *cobra.Command, _ []string) error {
ctx := request.SetCredentials(
cmd.Context(),
viper.GetString(awsAccessKey),
viper.GetString(awsSecretKey),
)
ctx = request.WithMultiparts(ctx)
file, err := os.Open(viper.GetString(logPathFlag))
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
client := &http.Client{
Transport: http.DefaultTransport,
Timeout: viper.GetDuration(httpTimeout),
}
if viper.GetBool(skipVerifyTLS) {
client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
id := 1
for {
logReq, err := getRequestFromLog(reader)
if err != nil {
if err == io.EOF {
break
}
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
id++
continue
}
select {
case <-ctx.Done():
return fmt.Errorf("interrupted: %w", ctx.Err())
default:
resp, err := playback(ctx, logReq, client)
if err != nil {
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
id++
continue
}
logResponse(cmd, id, resp, logReq)
id++
}
}
return nil
}
func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
var logReq request.LoggedRequest
req, err := reader.ReadString('\n')
if err != nil {
return logReq, err
}
err = json.Unmarshal([]byte(req), &logReq)
if err != nil {
return logReq, err
}
return logReq, nil
}
// playback creates http.Request from LoggedRequest and sends it to specified endpoint.
func playback(ctx context.Context, logReq request.LoggedRequest, client *http.Client) (*http.Response, error) {
r, err := prepareRequest(ctx, logReq)
if err != nil {
return nil, fmt.Errorf("failed to prepare request: %w", err)
}
resp, err := client.Do(r)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
defer resp.Body.Close()
if err = request.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
return nil, fmt.Errorf("failed to register multipart upload: %w", err)
}
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
return resp, nil
}
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
func prepareRequest(ctx context.Context, logReq request.LoggedRequest) (*http.Request, error) {
r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI,
bytes.NewReader(logReq.Body))
if err != nil {
return nil, err
}
r.Header = logReq.Header
err = request.SwapUploadID(ctx, r)
if err != nil {
return nil, err
}
sha256hash := sha256.New()
sha256hash.Write(logReq.Body)
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
if r.Header.Get(api.ContentMD5) != "" {
sha256hash.Reset()
md5hash := md5.New()
md5hash.Write(logReq.Body)
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
}
err = request.Sign(ctx, r)
if err != nil {
return nil, err
}
return r, nil
}