90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
import logging
|
|
import re
|
|
from functools import lru_cache
|
|
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
|
|
from frostfs_testlib.hosting import Host, Hosting
|
|
from frostfs_testlib.resources.cli import FROSTFS_ADM_EXEC, FROSTFS_AUTHMATE_EXEC, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.testing.parallel import parallel
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
|
|
@reporter.step("Get local binaries versions")
|
|
def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
|
|
versions = {}
|
|
|
|
for binary in [NEOGO_EXECUTABLE, FROSTFS_AUTHMATE_EXEC]:
|
|
out = shell.exec(f"{binary} --version").stdout
|
|
versions[binary] = parse_version(out)
|
|
|
|
frostfs_cli = FrostfsCli(shell, FROSTFS_CLI_EXEC)
|
|
versions[FROSTFS_CLI_EXEC] = parse_version(frostfs_cli.version.get().stdout)
|
|
|
|
try:
|
|
frostfs_adm = FrostfsAdm(shell, FROSTFS_ADM_EXEC)
|
|
versions[FROSTFS_ADM_EXEC] = parse_version(frostfs_adm.version.get().stdout)
|
|
except RuntimeError:
|
|
logger.info(f"{FROSTFS_ADM_EXEC} not installed")
|
|
|
|
out = shell.exec("aws --version").stdout
|
|
out_lines = out.split("\n")
|
|
versions["AWS"] = out_lines[0] if out_lines else "Unknown"
|
|
logger.info(f"Local binaries version: {out_lines[0]}")
|
|
|
|
return versions
|
|
|
|
|
|
@reporter.step("Collect binaries versions from host")
|
|
def parallel_binary_verions(host: Host) -> dict[str, str]:
|
|
versions_by_host = {}
|
|
|
|
binary_path_by_name = {
|
|
**{
|
|
svc.name[:-3]: {
|
|
"exec_path": svc.attributes.get("exec_path"),
|
|
"param": svc.attributes.get("custom_version_parameter", "--version"),
|
|
}
|
|
for svc in host.config.services
|
|
if svc.attributes.get("exec_path") and svc.attributes.get("requires_version_check", "true") == "true"
|
|
},
|
|
**{
|
|
cli.name: {"exec_path": cli.exec_path, "param": cli.attributes.get("custom_version_parameter", "--version")}
|
|
for cli in host.config.clis
|
|
if cli.attributes.get("requires_version_check", "true") == "true"
|
|
},
|
|
}
|
|
|
|
shell = host.get_shell()
|
|
versions_at_host = {}
|
|
for binary_name, binary in binary_path_by_name.items():
|
|
binary_path = binary["exec_path"]
|
|
try:
|
|
result = shell.exec(f"{binary_path} {binary['param']}")
|
|
version = parse_version(result.stdout) or parse_version(result.stderr) or "Unknown"
|
|
versions_at_host[binary_name] = version.strip()
|
|
except Exception as exc:
|
|
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
|
|
versions_at_host[binary_name] = "Unknown"
|
|
versions_by_host[host.config.address] = versions_at_host
|
|
return versions_by_host
|
|
|
|
|
|
@lru_cache
|
|
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, dict[str, str]]:
|
|
versions_by_host: dict[str, dict[str, str]] = {}
|
|
|
|
with reporter.step("Get remote binaries versions"):
|
|
future_binary_verions = parallel(parallel_binary_verions, parallel_items=hosting.hosts)
|
|
|
|
for future in future_binary_verions:
|
|
versions_by_host.update(future.result())
|
|
|
|
return versions_by_host
|
|
|
|
|
|
def parse_version(version_output: str) -> str:
|
|
version = re.search(r"(?<=version[:=])\s?[\"\']?v?(.+)", version_output, re.IGNORECASE)
|
|
return version.group(1).strip("\"'\n\t ") if version else version_output
|