[#314] Format all files with black and isort

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
Vladimir Domnich 2022-09-28 16:07:16 +04:00 committed by Vladimir
parent 26032a67ec
commit 147cac0ebc
46 changed files with 1506 additions and 1100 deletions

View file

@ -20,17 +20,17 @@ class SberCloudConfig:
project_id: Optional[str] = None
@staticmethod
def from_dict(config_dict: dict) -> 'SberCloudConfig':
def from_dict(config_dict: dict) -> "SberCloudConfig":
return SberCloudConfig(**config_dict)
@staticmethod
def from_yaml(config_path: str) -> 'SberCloudConfig':
def from_yaml(config_path: str) -> "SberCloudConfig":
with open(config_path) as file:
config_dict = yaml.load(file, Loader=yaml.FullLoader)
return SberCloudConfig.from_dict(config_dict["sbercloud"])
@staticmethod
def from_env() -> 'SberCloudConfig':
def from_env() -> "SberCloudConfig":
config_dict = {
"access_key_id": os.getenv("SBERCLOUD_ACCESS_KEY_ID"),
"secret_key": os.getenv("SBERCLOUD_SECRET_KEY"),
@ -53,7 +53,9 @@ class SberCloudAuthRequests:
ALGORITHM = "SDK-HMAC-SHA256"
TIMESTAMP_FORMAT = "%Y%m%dT%H%M%SZ"
def __init__(self, endpoint: str, access_key_id: str, secret_key: str, base_path: str = "") -> None:
def __init__(
self, endpoint: str, access_key_id: str, secret_key: str, base_path: str = ""
) -> None:
self.endpoint = endpoint
self.base_path = base_path
self.access_key_id = access_key_id
@ -62,12 +64,14 @@ class SberCloudAuthRequests:
def get(self, path: str, query: Optional[dict] = None) -> requests.Response:
return self._send_request("GET", path, query, data=None)
def post(self, path: str, query: Optional[dict] = None,
data: Optional[dict] = None) -> requests.Response:
def post(
self, path: str, query: Optional[dict] = None, data: Optional[dict] = None
) -> requests.Response:
return self._send_request("POST", path, query, data)
def _send_request(self, method: str, path: str, query: Optional[dict],
data: Optional[dict]) -> requests.Response:
def _send_request(
self, method: str, path: str, query: Optional[dict], data: Optional[dict]
) -> requests.Response:
if self.base_path:
path = self.base_path + path
@ -82,8 +86,9 @@ class SberCloudAuthRequests:
body = content.encode(self.ENCODING)
signed_headers = self._build_signed_headers(headers)
canonical_request = self._build_canonical_request(method, path, query, body, headers,
signed_headers)
canonical_request = self._build_canonical_request(
method, path, query, body, headers, signed_headers
)
signature = self._build_signature(timestamp, canonical_request)
headers["Authorization"] = self._build_authorization_header(signature, signed_headers)
@ -92,8 +97,10 @@ class SberCloudAuthRequests:
response = requests.request(method, url, headers=headers, data=body)
if response.status_code < 200 or response.status_code >= 300:
raise AssertionError(f"Request to url={url} failed: status={response.status_code} "
f"response={response.text})")
raise AssertionError(
f"Request to url={url} failed: status={response.status_code} "
f"response={response.text})"
)
return response
def _build_original_headers(self, timestamp: str) -> dict[str, str]:
@ -105,21 +112,30 @@ class SberCloudAuthRequests:
def _build_signed_headers(self, headers: dict[str, str]) -> list[str]:
return sorted(header_name.lower() for header_name in headers)
def _build_canonical_request(self, method: str, path: str, query: Optional[dict], body: bytes,
headers: dict[str, str], signed_headers: list[str]) -> str:
def _build_canonical_request(
self,
method: str,
path: str,
query: Optional[dict],
body: bytes,
headers: dict[str, str],
signed_headers: list[str],
) -> str:
canonical_headers = self._build_canonical_headers(headers, signed_headers)
body_hash = self._calc_sha256_hash(body)
canonical_url = self._build_canonical_url(path)
canonical_query_string = self._build_canonical_query_string(query)
return "\n".join([
method.upper(),
canonical_url,
canonical_query_string,
canonical_headers,
";".join(signed_headers),
body_hash
])
return "\n".join(
[
method.upper(),
canonical_url,
canonical_query_string,
canonical_headers,
";".join(signed_headers),
body_hash,
]
)
def _build_canonical_headers(self, headers: dict[str, str], signed_headers: list[str]) -> str:
normalized_headers = {}
@ -166,7 +182,7 @@ class SberCloudAuthRequests:
hmac_digest = hmac.new(
key=self.secret_key.encode(self.ENCODING),
msg=string_to_sign.encode(self.ENCODING),
digestmod=hashlib.sha256
digestmod=hashlib.sha256,
).digest()
signature = binascii.hexlify(hmac_digest).decode()
@ -185,6 +201,7 @@ class SberCloud:
https://docs.sbercloud.ru/terraform/ug/topics/quickstart.html
https://support.hc.sbercloud.ru/en-us/api/ecs/en-us_topic_0020212668.html
"""
def __init__(self, config: SberCloudConfig) -> None:
self.ecs_requests = SberCloudAuthRequests(
endpoint=config.ecs_endpoint,
@ -198,42 +215,29 @@ class SberCloud:
if not self.ecs_nodes or no_cache:
self.ecs_nodes = self.get_ecs_nodes()
nodes_by_ip = [
node for node in self.ecs_nodes
if ip in [
node_ip['addr']
for node_ips in node['addresses'].values()
for node_ip in node_ips
]
node
for node in self.ecs_nodes
if ip
in [node_ip["addr"] for node_ips in node["addresses"].values() for node_ip in node_ips]
]
assert len(nodes_by_ip) == 1
return nodes_by_ip[0]['id']
return nodes_by_ip[0]["id"]
def get_ecs_nodes(self) -> list[dict]:
response = self.ecs_requests.get("/detail", {"limit": "1000"}).json()
return response["servers"]
def start_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None) -> None:
data = {
'os-start': {
'servers': [
{
'id': node_id or self.find_ecs_node_by_ip(node_ip)
}
]
}
}
data = {"os-start": {"servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}]}}
self.ecs_requests.post("/action", data=data)
def stop_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None,
hard: bool = False) -> None:
def stop_node(
self, node_id: Optional[str] = None, node_ip: Optional[str] = None, hard: bool = False
) -> None:
data = {
'os-stop': {
'type': 'HARD' if hard else 'SOFT',
'servers': [
{
'id': node_id or self.find_ecs_node_by_ip(node_ip)
}
]
"os-stop": {
"type": "HARD" if hard else "SOFT",
"servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}],
}
}
self.ecs_requests.post("/action", data=data)