from concurrent.futures import ThreadPoolExecutor from frostfs_testlib.reporter import get_reporter from frostfs_testlib.storage.dataclasses.node_base import NodeBase reporter = get_reporter() class FileKeeper: """This class is responsible to make backup copy of modified file and restore when required (mostly after the test)""" files_to_restore: dict[NodeBase, list[str]] = {} @reporter.step_deco("Adding {file_to_restore} from node {node} to restore list") def add(self, node: NodeBase, file_to_restore: str): if node in self.files_to_restore and file_to_restore in self.files_to_restore[node]: # Already added return if node not in self.files_to_restore: self.files_to_restore[node] = [] if file_to_restore not in self.files_to_restore[node]: self.files_to_restore[node].append(file_to_restore) shell = node.host.get_shell() shell.exec(f"cp {file_to_restore} {file_to_restore}.bak") @reporter.step_deco("Restore files") def restore_files(self): nodes = self.files_to_restore.keys() if not nodes: return with ThreadPoolExecutor(max_workers=len(nodes)) as executor: results = executor.map(self._restore_files_on_node, nodes) self.files_to_restore.clear() for _ in results: # Iterate through results for exception check if any pass @reporter.step_deco("Restore files on node {node}") def _restore_files_on_node(self, node: NodeBase): shell = node.host.get_shell() for file_to_restore in self.files_to_restore[node]: with reporter.step(f"Restore file {file_to_restore} on node {node}"): shell.exec(f"cp {file_to_restore}.bak {file_to_restore}") shell.exec(f"rm {file_to_restore}.bak")