package modules import ( "bufio" "bytes" "context" "crypto/md5" "crypto/sha256" "crypto/tls" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "os" "strconv" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/internal/playback" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils" "github.com/spf13/cobra" "github.com/spf13/viper" ) const ( cfgPrintResponseLimit = "print-response-limit" cfgLogPath = "log" cfgEndpoint = "endpoint" awsAccessKey = "credentials.access_key" awsSecretKey = "credentials.secret_key" ) var runCmd = &cobra.Command{ Use: "run", Short: "Send requests from log file", Long: "Reads the network log file and sends each request to the specified URL", Example: "frostfs-s3-playback --config run [--endpoint=] [--log=]", PersistentPreRunE: func(cmd *cobra.Command, _ []string) (err error) { viper.SetDefault(cfgPrintResponseLimit, defaultPrintResponseLimit) return viper.BindPFlags(cmd.Flags()) }, RunE: run, } func initRunCmd() { runCmd.Flags().String(cfgLogPath, "./request.log", "log file path") runCmd.Flags().String(cfgEndpoint, "", "endpoint URL") runCmd.Flags().Int(cfgPrintResponseLimit, defaultPrintResponseLimit, "print limit for http response body") } func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) { cmd.Println(r.Method, r.URL.RequestURI()) cmd.Println(resp.Status) if resp.ContentLength == 0 { return } detect := detector.NewDetector(resp.Body, xmlutils.DetectXML) dataType, err := detect.Detect() if err != nil { cmd.PrintErrln("type detection error:", err.Error()) return } body := &bytes.Buffer{} resultWriter := xmlutils.ChooseWriter(dataType, body) _, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(cfgPrintResponseLimit))) if err != nil { cmd.Println(err) return } if err = resultWriter.Close(); err != nil { cmd.Printf("could not close response body: %s\n", err) return } cmd.Println(body.String()) cmd.Println() } func run(cmd *cobra.Command, _ []string) error { ctx := cmd.Context() settings := &playback.Settings{ Endpoint: viper.GetString(cfgEndpoint), Creds: playback.Credentials{ AccessKey: viper.GetString(awsAccessKey), SecretKey: viper.GetString(awsSecretKey), }, Multiparts: make(map[string]playback.MultipartUpload), Client: &http.Client{ Transport: http.DefaultTransport, Timeout: viper.GetDuration(cfgHTTPTimeoutFlag), }, } file, err := os.Open(viper.GetString(cfgLogPath)) if err != nil { return err } defer file.Close() reader := bufio.NewReader(file) if viper.GetBool(cfgSkipVerifyTLS) { settings.Client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } id := 1 for { logReq, err := getRequestFromLog(reader) if err != nil { if err == io.EOF { break } cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err) id++ continue } select { case <-ctx.Done(): return fmt.Errorf("interrupted: %w", ctx.Err()) default: r, resp, err := playbackRequest(ctx, logReq, settings) if err != nil { cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err) id++ continue } cmd.Print(strconv.Itoa(id) + ") ") logResponse(cmd, r, resp) id++ } } return nil } func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) { var logReq playback.LoggedRequest req, err := reader.ReadString('\n') if err != nil { return logReq, err } err = json.Unmarshal([]byte(req), &logReq) if err != nil { return logReq, err } return logReq, nil } // playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint. func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, *http.Response, error) { r, err := prepareRequest(ctx, logReq, settings) if err != nil { return nil, nil, fmt.Errorf("failed to prepare request: %w", err) } resp, err := settings.Client.Do(r) if err != nil { return nil, nil, fmt.Errorf("failed to send request: %w", err) } respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, nil, fmt.Errorf("failed to read response body: %w", err) } defer resp.Body.Close() if err = playback.HandleResponse(r, settings.Multiparts, respBody, logReq.Response); err != nil { return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err) } resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) return r, resp, nil } // prepareRequest creates request from logs and modifies its signature and uploadId (if presents). func prepareRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, error) { r, err := http.NewRequestWithContext(ctx, logReq.Method, settings.Endpoint+logReq.URI, bytes.NewReader(logReq.Payload)) if err != nil { return nil, err } r.Header = logReq.Header sha256hash := sha256.New() sha256hash.Write(logReq.Payload) r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil))) if r.Header.Get(api.ContentMD5) != "" { sha256hash.Reset() md5hash := md5.New() md5hash.Write(logReq.Payload) r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil))) } if r.URL.Query().Has("uploadId") { if err = playback.SwapUploadID(r, settings); err != nil { return nil, err } } if r.Header.Get(auth.AuthorizationHdr) != "" { if err = playback.Sign(ctx, r, settings.Creds); err != nil { return nil, err } } return r, nil }