import os import shutil import time from datetime import datetime, timezone import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.hosting import Host from frostfs_testlib.testing.cluster_test_base import Cluster from frostfs_testlib.testing.parallel import parallel def pytest_generate_tests(metafunc: pytest.Metafunc): metafunc.fixturenames.append("repo") metafunc.fixturenames.append("markers") metafunc.parametrize( "repo, markers", [("frostfs-testcases", metafunc.config.option.markexpr)], ) @pytest.mark.session_logs class TestLogs: @pytest.mark.logs_after_session @pytest.mark.order(1000) @allure.title("Check logs from frostfs-testcases with marks '{request.config.option.markexpr}' - search errors") def test_logs_search_errors( self, temp_directory: str, cluster: Cluster, session_start_time: datetime, request: pytest.FixtureRequest ): end_time = datetime.now(timezone.utc) logs_dir = os.path.join(temp_directory, "logs") if not os.path.exists(logs_dir): os.makedirs(logs_dir) issues_regex = r"\bpanic\b|\boom\b|too many|insufficient funds|insufficient amount of gas|cannot assign requested address|\bunable to process\b" exclude_filter = r"too many requests" log_level_priority = "3" # will include 0-3 priority logs (0: emergency 1: alerts 2: critical 3: errors) time.sleep(2) futures = parallel( self._collect_logs_on_host, cluster.hosts, logs_dir, issues_regex, session_start_time, end_time, exclude_filter, priority=log_level_priority, ) hosts_with_problems = [ future.result() for future in futures if not future.exception() and future.result() is not None ] if hosts_with_problems: self._attach_logs(logs_dir) assert ( not hosts_with_problems ), f"The following hosts contains critical errors in system logs: {', '.join(hosts_with_problems)}" @pytest.mark.order(1001) @allure.title( "Check logs from frostfs-testcases with marks '{request.config.option.markexpr}' - identify sensitive data" ) def test_logs_identify_sensitive_data( self, temp_directory: str, cluster: Cluster, session_start_time: datetime, request: pytest.FixtureRequest ): end_time = datetime.now(timezone.utc) logs_dir = os.path.join(temp_directory, "logs") if not os.path.exists(logs_dir): os.makedirs(logs_dir) _regex = { "authorization_basic": r"basic [a-zA-Z0-9=:_\+\/-]{16,100}", "authorization_bearer": r"bearer [a-zA-Z0-9_\-\.=:_\+\/]{16,100}", "access_token": r"\"access_token\":\"[0-9a-z]{16}\$[0-9a-f]{32}\"", "api_token": r"\"api_token\":\"(xox[a-zA-Z]-[a-zA-Z0-9-]+)\"", "yadro_access_token": r"[a-zA-Z0-9_-]*:[a-zA-Z0-9_\-]+@yadro\.com*", "SSH_privKey": r"([-]+BEGIN [^\s]+ PRIVATE KEY[-]+[\s]*[^-]*[-]+END [^\s]+ PRIVATE KEY[-]+)", "possible_Creds": r"(?i)(" r"password\s*[`=:]+\s*[^\s]+|" r"password is\s*[`=:]+\s*[^\s]+|" r"passwd\s*[`=:]+\s*[^\s]+)", } issues_regex = "|".join(_regex.values()) exclude_filter = r"COMMAND=\|--\sBoot\s" time.sleep(2) futures = parallel( self._collect_logs_on_host, cluster.hosts, logs_dir, issues_regex, session_start_time, end_time, exclude_filter, ) hosts_with_problems = [ future.result() for future in futures if not future.exception() and future.result() is not None ] if hosts_with_problems: self._attach_logs(logs_dir) assert ( not hosts_with_problems ), f"The following hosts contains sensitive data in system logs: {', '.join(hosts_with_problems)}" def _collect_logs_on_host( self, host: Host, logs_dir: str, regex: str, since: datetime, until: datetime, exclude_filter: str, priority: str = None, ): with reporter.step(f"Get logs from {host.config.address}"): logs = host.get_filtered_logs( filter_regex=regex, since=since, until=until, exclude_filter=exclude_filter, priority=priority ) if not logs: return None with open(os.path.join(logs_dir, f"{host.config.address}.log"), "w") as file: file.write(logs) return host.config.address def _attach_logs(self, logs_dir: str) -> None: # Zip all files and attach to Allure because it is more convenient to download a single # zip with all logs rather than mess with individual logs files per service or node logs_zip_file_path = shutil.make_archive(logs_dir, "zip", logs_dir) reporter.attach(logs_zip_file_path, "logs.zip")