diff --git a/src/frostfs_testlib/http/__init__.py b/src/frostfs_testlib/http/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/frostfs_testlib/http/http_client.py b/src/frostfs_testlib/http/http_client.py new file mode 100644 index 0000000..261b2a6 --- /dev/null +++ b/src/frostfs_testlib/http/http_client.py @@ -0,0 +1,95 @@ +import json +import logging +import logging.config + +import httpx + +from frostfs_testlib import reporter + +timeout = httpx.Timeout(60, read=150) +LOGGING_CONFIG = { + "disable_existing_loggers": False, + "version": 1, + "handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}}, + "formatters": { + "http": { + "format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + } + }, + "loggers": { + "httpx": { + "handlers": ["default"], + "level": "DEBUG", + }, + "httpcore": { + "handlers": ["default"], + "level": "ERROR", + }, + }, +} + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("NeoLogger") + + +class HttpClient: + @reporter.step("Send {method} request to {url}") + def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response: + transport = httpx.HTTPTransport(verify=False, retries=5) + client = httpx.Client(timeout=timeout, transport=transport) + response = client.request(method, url, **kwargs) + + self._attach_response(response) + logger.info(f"Response: {response.status_code} => {response.text}") + + if expected_status_code: + assert response.status_code == expected_status_code, ( + f"Got {response.status_code} response code" f" while {expected_status_code} expected" + ) + + return response + + def _attach_response(self, response: httpx.Response): + request = response.request + + try: + request_headers = json.dumps(dict(request.headers), indent=4) + except json.JSONDecodeError: + request_headers = str(request.headers) + + try: + request_body = request.read() + try: + request_body = request_body.decode("utf-8") + except UnicodeDecodeError as e: + request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}" + except Exception as e: + request_body = f"Error reading request body: {str(e)}" + + request_body = "" if request_body is None else request_body + + try: + response_headers = json.dumps(dict(response.headers), indent=4) + except json.JSONDecodeError: + response_headers = str(response.headers) + + report = ( + f"Method: {request.method}\n\n" + f"URL: {request.url}\n\n" + f"Request Headers: {request_headers}\n\n" + f"Request Body: {request_body}\n\n" + f"Response Status Code: {response.status_code}\n\n" + f"Response Headers: {response_headers}\n\n" + f"Response Body: {response.text}\n\n" + ) + curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body) + + reporter.attach(report, "Requests Info") + reporter.attach(curl_request, "CURL") + + def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str: + headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items()) + data = f" -d '{data}'" if data else "" + # Option -k means no verify SSL + return f"curl {url} -X {method} {headers}{data} -k"