[#330] Improve CURL generation and fix Boto3 logging
Some checks failed
DCO action / DCO (pull_request) Has been cancelled

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
This commit is contained in:
k.sosnovskikh 2024-12-02 14:18:17 +03:00
parent 7d6768c83f
commit 0e040d2722
2 changed files with 76 additions and 27 deletions

View file

@ -1,6 +1,8 @@
import io
import json import json
import logging import logging
import logging.config import logging.config
from typing import IO
import httpx import httpx
@ -40,7 +42,7 @@ class HttpClient:
client = httpx.Client(timeout=timeout, transport=transport) client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs) response = client.request(method, url, **kwargs)
self._attach_response(response) self._attach_response(response, **kwargs)
logger.info(f"Response: {response.status_code} => {response.text}") logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code: if expected_status_code:
@ -51,47 +53,91 @@ class HttpClient:
return response return response
@classmethod @classmethod
def _attach_response(cls, response: httpx.Response): def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
request = response.request
try: try:
request_headers = json.dumps(dict(request.headers), indent=4) content = readable.read()
except json.JSONDecodeError:
request_headers = str(request.headers)
try:
request_body = request.read()
try:
request_body = request_body.decode("utf-8")
except UnicodeDecodeError as e:
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
except Exception as e: except Exception as e:
request_body = f"Error reading request body: {str(e)}" logger.warning(f"Unable to read file: {str(e)}")
return None
request_body = "" if request_body is None else request_body if not content:
return None
request_body = None
try: try:
response_headers = json.dumps(dict(response.headers), indent=4) request_body = json.loads(content)
except json.JSONDecodeError: except (json.JSONDecodeError, UnicodeDecodeError) as e:
response_headers = str(response.headers) logger.warning(f"Unable to convert body to json: {str(e)}")
if request_body is not None:
return json.dumps(request_body, default=str, indent=4)
try:
request_body = content.decode()
except UnicodeDecodeError as e:
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
request_body = content if request_body is None else request_body
request_body = "<large text data>" if len(request_body) > 1000 else request_body
return request_body
@classmethod
def _parse_files(cls, files: dict | None) -> str | None:
if not files:
return None
filepaths = {}
for name, file in files.items():
if isinstance(file, io.IOBase):
filepaths[name] = file.name
if isinstance(file, tuple):
filepaths[name] = file[1].name
return json.dumps(filepaths, default=str, indent=4)
@classmethod
def _attach_response(cls, response: httpx.Response, **kwargs):
request = response.request
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
request_body = cls._parse_body(request)
files = kwargs.get("files")
request_files = cls._parse_files(files)
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
response_body = cls._parse_body(response)
report = ( report = (
f"Method: {request.method}\n\n" f"Method: {request.method}\n\n"
f"URL: {request.url}\n\n" + f"URL: {request.url}\n\n"
f"Request Headers: {request_headers}\n\n" + f"Request Headers: {request_headers}\n\n"
f"Request Body: {request_body}\n\n" + (f"Request Body: {request_body}\n\n" if request_body else "")
f"Response Status Code: {response.status_code}\n\n" + (f"Request Files: {request_files}\n\n" if request_files else "")
f"Response Headers: {response_headers}\n\n" + f"Response Status Code: {response.status_code}\n\n"
f"Response Body: {response.text}\n\n" + f"Response Headers: {response_headers}\n\n"
+ (f"Response Body: {response_body}\n\n" if response_body else "")
) )
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body) curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, files)
reporter.attach(report, "Requests Info") reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL") reporter.attach(curl_request, "CURL")
@classmethod @classmethod
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str) -> str: def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict = None) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items()) headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else "" data = f" -d '{data}'" if data else ""
if files:
for name, file in files.items():
if isinstance(file, io.IOBase):
data += f' -F "{name}=@{file.name}"'
if isinstance(file, tuple):
data += f' -F "{name}=@{file[1].name}"'
# Option -k means no verify SSL # Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k" return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -80,6 +80,9 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
if not params: if not params:
params = {} params = {}
if params.get("Body") and len(params.get("Body")) > 1000:
params["Body"] = "<large text data>"
output_params = params output_params = params
try: try: