194 lines
4.8 KiB
Go
194 lines
4.8 KiB
Go
package modules
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"crypto/tls"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/request"
|
|
"github.com/spf13/cobra"
|
|
"github.com/spf13/viper"
|
|
)
|
|
|
|
const (
|
|
logPathFlag = "log"
|
|
endpointFlag = "endpoint"
|
|
awsAccessKey = "credentials.access_key"
|
|
awsSecretKey = "credentials.secret_key"
|
|
)
|
|
|
|
var runCmd = &cobra.Command{
|
|
Use: "run",
|
|
Short: "Send requests from log file",
|
|
Long: "Reads the network log file and sends each request to the specified URL",
|
|
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
|
|
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
|
if rootCmd.PersistentPreRunE != nil {
|
|
if err := rootCmd.PersistentPreRunE(cmd, args); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
_ = viper.BindPFlag(logPathFlag, cmd.Flags().Lookup(logPathFlag))
|
|
_ = viper.BindPFlag(endpointFlag, cmd.Flags().Lookup(endpointFlag))
|
|
|
|
return nil
|
|
},
|
|
RunE: run,
|
|
}
|
|
|
|
func init() {
|
|
runCmd.Flags().String(logPathFlag, "./request.log", "log file path")
|
|
runCmd.Flags().String(endpointFlag, "", "endpoint URL")
|
|
}
|
|
|
|
func logResponse(cmd *cobra.Command, id int, resp *http.Response, logReq request.LoggedRequest) {
|
|
cmd.Println(strconv.Itoa(id)+")", logReq.Method, logReq.URI)
|
|
cmd.Println(resp.Status)
|
|
if resp.ContentLength != 0 {
|
|
body, err := io.ReadAll(io.LimitReader(resp.Body, resp.ContentLength))
|
|
if err != nil {
|
|
cmd.Println(id, err)
|
|
}
|
|
cmd.Println(string(body))
|
|
}
|
|
cmd.Println()
|
|
}
|
|
|
|
func run(cmd *cobra.Command, _ []string) error {
|
|
ctx := request.SetCredentials(
|
|
cmd.Context(),
|
|
viper.GetString(awsAccessKey),
|
|
viper.GetString(awsSecretKey),
|
|
)
|
|
ctx = request.WithMultiparts(ctx)
|
|
|
|
file, err := os.Open(viper.GetString(logPathFlag))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
reader := bufio.NewReader(file)
|
|
|
|
client := &http.Client{
|
|
Transport: http.DefaultTransport,
|
|
Timeout: viper.GetDuration(httpTimeout),
|
|
}
|
|
|
|
if viper.GetBool(skipVerifyTLS) {
|
|
client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
|
}
|
|
|
|
id := 1
|
|
for {
|
|
logReq, err := getRequestFromLog(reader)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
|
|
id++
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("interrupted: %w", ctx.Err())
|
|
default:
|
|
resp, err := playback(ctx, logReq, client)
|
|
if err != nil {
|
|
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
|
id++
|
|
continue
|
|
}
|
|
logResponse(cmd, id, resp, logReq)
|
|
id++
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getRequestFromLog(reader *bufio.Reader) (request.LoggedRequest, error) {
|
|
var logReq request.LoggedRequest
|
|
req, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
return logReq, err
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(req), &logReq)
|
|
if err != nil {
|
|
return logReq, err
|
|
}
|
|
|
|
return logReq, nil
|
|
}
|
|
|
|
// playback creates http.Request from LoggedRequest and sends it to specified endpoint.
|
|
func playback(ctx context.Context, logReq request.LoggedRequest, client *http.Client) (*http.Response, error) {
|
|
r, err := prepareRequest(ctx, logReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to prepare request: %w", err)
|
|
}
|
|
resp, err := client.Do(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to send request: %w", err)
|
|
}
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read response body: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if err = request.HandleResponse(ctx, r, respBody, logReq.Response); err != nil {
|
|
return nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
|
}
|
|
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
|
func prepareRequest(ctx context.Context, logReq request.LoggedRequest) (*http.Request, error) {
|
|
r, err := http.NewRequestWithContext(ctx, logReq.Method, viper.GetString(endpointFlag)+logReq.URI,
|
|
bytes.NewReader(logReq.Body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.Header = logReq.Header
|
|
|
|
err = request.SwapUploadID(ctx, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sha256hash := sha256.New()
|
|
sha256hash.Write(logReq.Body)
|
|
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
|
if r.Header.Get(api.ContentMD5) != "" {
|
|
sha256hash.Reset()
|
|
md5hash := md5.New()
|
|
md5hash.Write(logReq.Body)
|
|
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
|
}
|
|
err = request.Sign(ctx, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return r, nil
|
|
}
|