import io import json import logging import logging.config from typing import Mapping, Sequence import httpx from frostfs_testlib import reporter timeout = httpx.Timeout(60, read=150) LOGGING_CONFIG = { "disable_existing_loggers": False, "version": 1, "handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}}, "formatters": { "http": { "format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S", } }, "loggers": { "httpx": { "handlers": ["default"], "level": "DEBUG", }, "httpcore": { "handlers": ["default"], "level": "ERROR", }, }, } logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("NeoLogger") class HttpClient: @reporter.step("Send {method} request to {url}") def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response: transport = httpx.HTTPTransport(verify=False, retries=5) client = httpx.Client(timeout=timeout, transport=transport) response = client.request(method, url, **kwargs) self._attach_response(response, **kwargs) logger.info(f"Response: {response.status_code} => {response.text}") if expected_status_code: assert ( response.status_code == expected_status_code ), f"Got {response.status_code} response code while {expected_status_code} expected" return response @classmethod def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None: try: content = readable.read() except Exception as e: logger.warning(f"Unable to read file: {str(e)}") return None if not content: return None request_body = None try: request_body = json.loads(content) except (json.JSONDecodeError, UnicodeDecodeError) as e: logger.warning(f"Unable to convert body to json: {str(e)}") if request_body is not None: return json.dumps(request_body, default=str, indent=4) try: request_body = content.decode() except UnicodeDecodeError as e: logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}") request_body = content if request_body is None else request_body request_body = "" if len(request_body) > 1000 else request_body return request_body @classmethod def _parse_files(cls, files: Mapping | Sequence | None) -> dict: filepaths = {} if not files: return filepaths if isinstance(files, Sequence): items = files elif isinstance(files, Mapping): items = files.items() else: raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}") for name, file in items: if isinstance(file, io.IOBase): filepaths[name] = file.name elif isinstance(file, Sequence): filepaths[name] = file[1].name return filepaths @classmethod def _attach_response(cls, response: httpx.Response, **kwargs): request = response.request request_headers = json.dumps(dict(request.headers), default=str, indent=4) request_body = cls._parse_body(request) files = kwargs.get("files") request_files = cls._parse_files(files) response_headers = json.dumps(dict(response.headers), default=str, indent=4) response_body = cls._parse_body(response) report = ( f"Method: {request.method}\n\n" + f"URL: {request.url}\n\n" + f"Request Headers: {request_headers}\n\n" + (f"Request Body: {request_body}\n\n" if request_body else "") + (f"Request Files: {request_files}\n\n" if request_files else "") + f"Response Status Code: {response.status_code}\n\n" + f"Response Headers: {response_headers}\n\n" + (f"Response Body: {response_body}\n\n" if response_body else "") ) curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files) reporter.attach(report, "Requests Info") reporter.attach(curl_request, "CURL") @classmethod def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str: excluded_headers = {"Accept-Encoding", "Connection", "User-Agent", "Content-Length"} headers = " ".join(f"-H '{header.title()}: {value}'" for header, value in headers.items() if header.title() not in excluded_headers) data = f" -d '{data}'" if data else "" for name, path in files.items(): data += f' -F "{name}=@{path}"' # Option -k means no verify SSL return f"curl {url} -X {method} {headers}{data} -k"