diff --git a/src/frostfs_testlib/http/http_client.py b/src/frostfs_testlib/http/http_client.py index 3106273..0d1e0bd 100644 --- a/src/frostfs_testlib/http/http_client.py +++ b/src/frostfs_testlib/http/http_client.py @@ -1,6 +1,8 @@ +import io import json import logging import logging.config +from typing import IO import httpx @@ -40,7 +42,7 @@ class HttpClient: client = httpx.Client(timeout=timeout, transport=transport) response = client.request(method, url, **kwargs) - self._attach_response(response) + self._attach_response(response, **kwargs) logger.info(f"Response: {response.status_code} => {response.text}") if expected_status_code: @@ -51,47 +53,91 @@ class HttpClient: return response @classmethod - def _attach_response(cls, response: httpx.Response): - request = response.request - + def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None: try: - request_headers = json.dumps(dict(request.headers), indent=4) - except json.JSONDecodeError: - request_headers = str(request.headers) - - try: - request_body = request.read() - try: - request_body = request_body.decode("utf-8") - except UnicodeDecodeError as e: - request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}" + content = readable.read() except Exception as e: - request_body = f"Error reading request body: {str(e)}" + logger.warning(f"Unable to read file: {str(e)}") + return None - request_body = "" if request_body is None else request_body + if not content: + return None + + request_body = None try: - response_headers = json.dumps(dict(response.headers), indent=4) - except json.JSONDecodeError: - response_headers = str(response.headers) + request_body = json.loads(content) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + logger.warning(f"Unable to convert body to json: {str(e)}") + + if request_body is not None: + return json.dumps(request_body, default=str, indent=4) + + try: + request_body = content.decode() + except UnicodeDecodeError as e: + logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}") + + request_body = content if request_body is None else request_body + request_body = "" if len(request_body) > 1000 else request_body + + return request_body + + @classmethod + def _parse_files(cls, files: dict | None) -> str | None: + if not files: + return None + + filepaths = {} + + for name, file in files.items(): + if isinstance(file, io.IOBase): + filepaths[name] = file.name + + if isinstance(file, tuple): + filepaths[name] = file[1].name + + return json.dumps(filepaths, default=str, indent=4) + + @classmethod + def _attach_response(cls, response: httpx.Response, **kwargs): + request = response.request + request_headers = json.dumps(dict(request.headers), default=str, indent=4) + request_body = cls._parse_body(request) + + files = kwargs.get("files") + request_files = cls._parse_files(files) + + response_headers = json.dumps(dict(response.headers), default=str, indent=4) + response_body = cls._parse_body(response) report = ( f"Method: {request.method}\n\n" - f"URL: {request.url}\n\n" - f"Request Headers: {request_headers}\n\n" - f"Request Body: {request_body}\n\n" - f"Response Status Code: {response.status_code}\n\n" - f"Response Headers: {response_headers}\n\n" - f"Response Body: {response.text}\n\n" + + f"URL: {request.url}\n\n" + + f"Request Headers: {request_headers}\n\n" + + (f"Request Body: {request_body}\n\n" if request_body else "") + + (f"Request Files: {request_files}\n\n" if request_files else "") + + f"Response Status Code: {response.status_code}\n\n" + + f"Response Headers: {response_headers}\n\n" + + (f"Response Body: {response_body}\n\n" if response_body else "") ) - curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body) + curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, files) reporter.attach(report, "Requests Info") reporter.attach(curl_request, "CURL") @classmethod - def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str) -> str: + def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict = None) -> str: headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items()) data = f" -d '{data}'" if data else "" + + if files: + for name, file in files.items(): + if isinstance(file, io.IOBase): + data += f' -F "{name}=@{file.name}"' + + if isinstance(file, tuple): + data += f' -F "{name}=@{file[1].name}"' + # Option -k means no verify SSL return f"curl {url} -X {method} {headers}{data} -k" diff --git a/src/frostfs_testlib/utils/cli_utils.py b/src/frostfs_testlib/utils/cli_utils.py index 32e4346..0f9fef2 100644 --- a/src/frostfs_testlib/utils/cli_utils.py +++ b/src/frostfs_testlib/utils/cli_utils.py @@ -80,6 +80,9 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d if not params: params = {} + if params.get("Body") and len(params.get("Body")) > 1000: + params["Body"] = "" + output_params = params try: