diff --git a/pytest_tests/helpers/sbercloud_helper.py b/pytest_tests/helpers/sbercloud_helper.py index 51161aa..cb3b4ca 100644 --- a/pytest_tests/helpers/sbercloud_helper.py +++ b/pytest_tests/helpers/sbercloud_helper.py @@ -1,7 +1,12 @@ +import binascii +import hashlib +import hmac import json import os from dataclasses import dataclass +from datetime import datetime from typing import Optional +from urllib.parse import quote, unquote import requests import yaml @@ -9,11 +14,10 @@ import yaml @dataclass class SberCloudConfig: - login: Optional[str] = None - password: Optional[str] = None - domain: Optional[str] = None + access_key_id: Optional[str] = None + secret_key: Optional[str] = None + ecs_endpoint: Optional[str] = None project_id: Optional[str] = None - iam_url: Optional[str] = None @staticmethod def from_dict(config_dict: dict) -> 'SberCloudConfig': @@ -28,15 +32,151 @@ class SberCloudConfig: @staticmethod def from_env() -> 'SberCloudConfig': config_dict = { - "domain": os.getenv("SBERCLOUD_DOMAIN"), - "login": os.getenv("SBERCLOUD_LOGIN"), - "password": os.getenv("SBERCLOUD_PASSWORD"), + "access_key_id": os.getenv("SBERCLOUD_ACCESS_KEY_ID"), + "secret_key": os.getenv("SBERCLOUD_SECRET_KEY"), + "ecs_endpoint": os.getenv("SBERCLOUD_ECS_ENDPOINT"), "project_id": os.getenv("SBERCLOUD_PROJECT_ID"), - "iam_url": os.getenv("SBERCLOUD_IAM_URL"), } return SberCloudConfig.from_dict(config_dict) +class SberCloudAuthRequests: + """ + Implements authentication mechanism with access key+secret key in accordance with: + https://support.hc.sbercloud.ru/devg/apisign/api-sign-algorithm.html + + endpoint - represents endpoint of a specific service (listed at https://support.hc.sbercloud.ru/en-us/endpoint/index.html) + base_path - is prefix for all request path's that will be sent via this instance. + """ + + ENCODING = "utf-8" + ALGORITHM = "SDK-HMAC-SHA256" + TIMESTAMP_FORMAT = "%Y%m%dT%H%M%SZ" + + def __init__(self, endpoint: str, access_key_id: str, secret_key: str, base_path: str = "") -> None: + self.endpoint = endpoint + self.base_path = base_path + self.access_key_id = access_key_id + self.secret_key = secret_key + + def get(self, path: str, query: Optional[dict] = None) -> requests.Response: + return self._send_request("GET", path, query, content="") + + def post(self, path: str, query: Optional[dict] = None, + data: Optional[dict] = None) -> requests.Response: + content = json.dumps(data) if data else "" + return self._send_request("POST", path, query, content) + + def _send_request(self, method: str, path: str, query: Optional[dict], + content: str) -> requests.Response: + body = content.encode(self.ENCODING) + if self.base_path: + path = self.base_path + path + + timestamp = datetime.strftime(datetime.utcnow(), self.TIMESTAMP_FORMAT) + headers = self._build_original_headers(timestamp, body) + + signed_headers = self._build_signed_headers(headers) + canonical_request = self._build_canonical_request(method, path, query, body, headers, + signed_headers) + signature = self._build_signature(timestamp, canonical_request) + headers["Authorization"] = self._build_authorization_header(signature, signed_headers) + + query_string = "?" + self._build_canonical_query_string(query) if query else "" + url = f"https://{self.endpoint}{path}{query_string}" + + response = requests.request(method, url, headers=headers, data=body) + if response.status_code < 200 or response.status_code >= 300: + raise AssertionError(f"Request to url={url} failed: status={response.status_code} " + f"response={response.text})") + return response + + def _build_original_headers(self, timestamp: str, body: bytes) -> dict[str, str]: + headers = {} + headers["X-Sdk-Date"] = timestamp + headers["host"] = self.endpoint + + if body: + headers["Content-Type"] = "application/json" + headers["content-length"] = str(len(body)) + + return headers + + def _build_signed_headers(self, headers: dict[str, str]) -> list[str]: + return sorted(header_name.lower() for header_name in headers) + + def _build_canonical_request(self, method: str, path: str, query: Optional[dict], body: bytes, + headers: dict[str, str], signed_headers: list[str]) -> str: + canonical_headers = self._build_canonical_headers(headers, signed_headers) + body_hash = self._calc_sha256_hash(body) + canonical_url = self._build_canonical_url(path) + canonical_query_string = self._build_canonical_query_string(query) + + return "\n".join([ + method.upper(), + canonical_url, + canonical_query_string, + canonical_headers, + ";".join(signed_headers), + body_hash + ]) + + def _build_canonical_headers(self, headers: dict[str, str], signed_headers: list[str]) -> str: + normalized_headers = {} + for key, value in headers.items(): + normalized_key = key.lower() + normalized_value = value.strip() + normalized_headers[normalized_key] = normalized_value + # Re-encode header in request itself + headers[key] = normalized_value.encode(self.ENCODING).decode("iso-8859-1") + + # Join headers in the same order as they are sorted in signed_headers list + joined_headers = "\n".join(f"{key}:{normalized_headers[key]}" for key in signed_headers) + return joined_headers + "\n" + + def _calc_sha256_hash(self, value: bytes) -> str: + sha256 = hashlib.sha256() + sha256.update(value) + return sha256.hexdigest() + + def _build_canonical_url(self, path: str) -> str: + path_parts = unquote(path).split("/") + canonical_url = "/".join(quote(path_part) for path_part in path_parts) + + if not canonical_url.endswith("/"): + canonical_url += "/" + return canonical_url + + def _build_canonical_query_string(self, query: Optional[dict]) -> str: + if not query: + return "" + + key_value_pairs = [] + for key in sorted(query.keys()): + # NOTE: we do not support list values, as they are not used in API at the moment + encoded_key = quote(key) + encoded_value = quote(str(query[key])) + key_value_pairs.append(f"{encoded_key}={encoded_value}") + return "&".join(key_value_pairs) + + def _build_signature(self, timestamp: str, canonical_request: str) -> str: + canonical_request_hash = self._calc_sha256_hash(canonical_request.encode(self.ENCODING)) + string_to_sign = f"{self.ALGORITHM}\n{timestamp}\n{canonical_request_hash}" + + hmac_digest = hmac.new( + key=self.secret_key.encode(self.ENCODING), + msg=string_to_sign.encode(self.ENCODING), + digestmod=hashlib.sha256 + ).digest() + signature = binascii.hexlify(hmac_digest).decode() + + return signature + + def _build_authorization_header(self, signature: str, signed_headers: list[str]) -> str: + joined_signed_headers = ";".join(signed_headers) + return f"{self.ALGORITHM} Access={self.access_key_id}, SignedHeaders={joined_signed_headers}, Signature={signature}" + + class SberCloud: """ Manages resources in Sbercloud via API. @@ -46,46 +186,13 @@ class SberCloud: https://support.hc.sbercloud.ru/en-us/api/ecs/en-us_topic_0020212668.html """ def __init__(self, config: SberCloudConfig) -> None: - self.config = config - self.ecs_url = None - self.project_id = None - self.token = None - self._initialize() - self.ecs_nodes = self.get_ecs_nodes() - - def _initialize(self) -> None: - data = { - 'auth': { - 'identity': { - 'methods': ['password'], - 'password': { - 'user': { - 'domain': { - 'name': self.config.domain - }, - 'name': self.config.login, - 'password': self.config.password - } - } - }, - 'scope': { - 'project': { - 'id': self.config.project_id - } - } - } - } - response = requests.post( - f'{self.config.iam_url}/v3/auth/tokens', - data=json.dumps(data), - headers={'Content-Type': 'application/json'} + self.ecs_requests = SberCloudAuthRequests( + endpoint=config.ecs_endpoint, + base_path=f"/v1/{config.project_id}/cloudservers", + access_key_id=config.access_key_id, + secret_key=config.secret_key, ) - self.ecs_url = [ - catalog['endpoints'][0]['url'] - for catalog in response.json()['token']['catalog'] if catalog['type'] == 'ecs' - ][0] - self.project_id = self.ecs_url.split('/')[-1] - self.token = response.headers['X-Subject-Token'] + self.ecs_nodes = [] # Cached list of ecs servers def find_ecs_node_by_ip(self, ip: str, no_cache: bool = False) -> str: if not self.ecs_nodes or no_cache: @@ -102,9 +209,8 @@ class SberCloud: return nodes_by_ip[0]['id'] def get_ecs_nodes(self) -> list[dict]: - response = requests.get(f'{self.ecs_url}/cloudservers/detail', - headers={'X-Auth-Token': self.token}).json() - return response['servers'] + response = self.ecs_requests.get("/detail").json() + return response["servers"] def start_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None) -> None: data = { @@ -116,13 +222,7 @@ class SberCloud: ] } } - response = requests.post( - f'{self.ecs_url}/cloudservers/action', - data=json.dumps(data), - headers={'Content-Type': 'application/json', 'X-Auth-Token': self.token} - ) - assert response.status_code < 300, \ - f'Status:{response.status_code}. Server not started: {response.json()}' + self.ecs_requests.post("/action", data=data) def stop_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None, hard: bool = False) -> None: @@ -136,10 +236,4 @@ class SberCloud: ] } } - response = requests.post( - f'{self.ecs_url}/cloudservers/action', - data=json.dumps(data), - headers={'Content-Type': 'application/json', 'X-Auth-Token': self.token} - ) - assert response.status_code < 300, \ - f'Status:{response.status_code}. Server not stopped: {response.json()}' + self.ecs_requests.post("/action", data=data)