forked from TrueCloudLab/frostfs-testlib
Compare commits
2 commits
8206fbf72e
...
22b4a435e8
Author | SHA1 | Date | |
---|---|---|---|
22b4a435e8 | |||
7382f36389 |
4 changed files with 97 additions and 2 deletions
0
src/frostfs_testlib/http/__init__.py
Normal file
0
src/frostfs_testlib/http/__init__.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from frostfs_testlib import reporter
|
||||||
|
|
||||||
|
timeout = httpx.Timeout(60, read=150)
|
||||||
|
LOGGING_CONFIG = {
|
||||||
|
"disable_existing_loggers": False,
|
||||||
|
"version": 1,
|
||||||
|
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
|
||||||
|
"formatters": {
|
||||||
|
"http": {
|
||||||
|
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
|
||||||
|
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"loggers": {
|
||||||
|
"httpx": {
|
||||||
|
"handlers": ["default"],
|
||||||
|
"level": "DEBUG",
|
||||||
|
},
|
||||||
|
"httpcore": {
|
||||||
|
"handlers": ["default"],
|
||||||
|
"level": "ERROR",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.config.dictConfig(LOGGING_CONFIG)
|
||||||
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
|
||||||
|
class HttpClient:
|
||||||
|
@reporter.step("Send {method} request to {url}")
|
||||||
|
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
|
||||||
|
transport = httpx.HTTPTransport(verify=False, retries=5)
|
||||||
|
client = httpx.Client(timeout=timeout, transport=transport)
|
||||||
|
response = client.request(method, url, **kwargs)
|
||||||
|
|
||||||
|
self._attach_response(response)
|
||||||
|
logger.info(f"Response: {response.status_code} => {response.text}")
|
||||||
|
|
||||||
|
if expected_status_code:
|
||||||
|
assert response.status_code == expected_status_code, (
|
||||||
|
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
|
||||||
|
)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _attach_response(self, response: httpx.Response):
|
||||||
|
request = response.request
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_headers = json.dumps(dict(request.headers), indent=4)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
request_headers = str(request.headers)
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_body = request.read()
|
||||||
|
try:
|
||||||
|
request_body = request_body.decode("utf-8")
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
|
||||||
|
except Exception as e:
|
||||||
|
request_body = f"Error reading request body: {str(e)}"
|
||||||
|
|
||||||
|
request_body = "" if request_body is None else request_body
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_headers = json.dumps(dict(response.headers), indent=4)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
response_headers = str(response.headers)
|
||||||
|
|
||||||
|
report = (
|
||||||
|
f"Method: {request.method}\n\n"
|
||||||
|
f"URL: {request.url}\n\n"
|
||||||
|
f"Request Headers: {request_headers}\n\n"
|
||||||
|
f"Request Body: {request_body}\n\n"
|
||||||
|
f"Response Status Code: {response.status_code}\n\n"
|
||||||
|
f"Response Headers: {response_headers}\n\n"
|
||||||
|
f"Response Body: {response.text}\n\n"
|
||||||
|
)
|
||||||
|
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
|
||||||
|
|
||||||
|
reporter.attach(report, "Requests Info")
|
||||||
|
reporter.attach(curl_request, "CURL")
|
||||||
|
|
||||||
|
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
|
||||||
|
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
||||||
|
data = f" -d '{data}'" if data else ""
|
||||||
|
# Option -k means no verify SSL
|
||||||
|
return f"curl {url} -X {method} {headers}{data} -k"
|
|
@ -1443,7 +1443,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
# MFA METHODS
|
# MFA METHODS
|
||||||
@reporter.step("Creates a new virtual MFA device")
|
@reporter.step("Creates a new virtual MFA device")
|
||||||
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple[str, bool]:
|
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
|
||||||
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
|
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
|
||||||
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
|
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
|
||||||
|
|
||||||
|
|
|
@ -1280,7 +1280,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
@reporter.step("Creates a new virtual MFA device")
|
@reporter.step("Creates a new virtual MFA device")
|
||||||
def iam_create_virtual_mfa_device(
|
def iam_create_virtual_mfa_device(
|
||||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||||
) -> dict:
|
) -> tuple:
|
||||||
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
|
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
|
||||||
|
|
||||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||||
|
|
Loading…
Reference in a new issue