import hashlib import logging import os import uuid from typing import Any, Optional from frostfs_testlib import reporter from frostfs_testlib.resources.common import ASSETS_DIR from frostfs_testlib.utils import string_utils logger = logging.getLogger("NeoLogger") class TestFile(os.PathLike): def __init__(self, path: str): self.path = path def __del__(self): logger.debug(f"Removing file {self.path}") if os.path.exists(self.path): os.remove(self.path) def __str__(self): return self.path def __repr__(self): return self.path def __fspath__(self): return self.path def ensure_directory(path): directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) def ensure_directory_opener(path, flags): ensure_directory(path) return os.open(path, flags) # TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps # Use object_size dt in future as argument @reporter.step("Generate file") def generate_file(size: int, file_name: Optional[str] = None) -> TestFile: """Generates a binary file with the specified size in bytes. Args: size: Size in bytes, can be declared as 6e+6 for example. Returns: The path to the generated file. """ if file_name is None: file_name = string_utils.unique_name("object-") test_file = TestFile(os.path.join(ASSETS_DIR, file_name)) with open(test_file, "wb", opener=ensure_directory_opener) as file: file.write(os.urandom(size)) logger.info(f"File with size {size} bytes has been generated: {test_file}") return test_file # TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps # Use object_size dt in future as argument @reporter.step("Generate file with content") def generate_file_with_content( size: int, file_path: Optional[str | TestFile] = None, content: Optional[str] = None, ) -> TestFile: """Creates a new file with specified content. Args: file_path: Path to the file that should be created. If not specified, then random file path will be generated. content: Content that should be stored in the file. If not specified, then random binary content will be generated. Returns: Path to the generated file. """ mode = "w+" if content is None: content = os.urandom(size) mode = "wb" test_file = None if not file_path: test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))) elif isinstance(file_path, TestFile): test_file = file_path else: test_file = TestFile(file_path) with open(test_file, mode, opener=ensure_directory_opener) as file: file.write(content) return test_file @reporter.step("Get File Hash") def get_file_hash(file_path: str | TestFile, len: Optional[int] = None, offset: Optional[int] = None) -> str: """Generates hash for the specified file. Args: file_path: Path to the file to generate hash for. len: How many bytes to read. offset: Position to start reading from. Returns: Hash of the file as hex-encoded string. """ file_hash = hashlib.sha256() with open(file_path, "rb") as out: if len and not offset: file_hash.update(out.read(len)) elif len and offset: out.seek(offset, 0) file_hash.update(out.read(len)) elif offset and not len: out.seek(offset, 0) file_hash.update(out.read()) else: file_hash.update(out.read()) return file_hash.hexdigest() @reporter.step("Concatenation set of files to one file") def concat_files(file_paths: list[str | TestFile], resulting_file_path: Optional[str | TestFile] = None) -> TestFile: """Concatenates several files into a single file. Args: file_paths: Paths to the files to concatenate. resulting_file_path: Path to the file where concatenated content should be stored. Returns: Path to the resulting file. """ test_file = None if not resulting_file_path: test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))) elif isinstance(resulting_file_path, TestFile): test_file = resulting_file_path else: test_file = TestFile(resulting_file_path) with open(test_file, "wb", opener=ensure_directory_opener) as f: for file in file_paths: with open(file, "rb") as part_file: f.write(part_file.read()) return test_file @reporter.step("Split file to {parts} parts") def split_file(file_path: str | TestFile, parts: int) -> list[TestFile]: """Splits specified file into several specified number of parts. Each part is saved under name `{original_file}_part_{i}`. Args: file_path: Path to the file that should be split. parts: Number of parts the file should be split into. Returns: Paths to the part files. """ with open(file_path, "rb") as file: content = file.read() content_size = len(content) chunk_size = int((content_size + parts) / parts) part_id = 1 part_file_paths = [] for content_offset in range(0, content_size + 1, chunk_size): part_file_name = f"{file_path}_part_{part_id}" part_file_paths.append(TestFile(part_file_name)) with open(part_file_name, "wb") as out_file: out_file.write(content[content_offset : content_offset + chunk_size]) part_id += 1 return part_file_paths @reporter.step("Get file content") def get_file_content(file_path: str | TestFile, content_len: Optional[int] = None, mode: str = "r", offset: Optional[int] = None) -> Any: """Returns content of specified file. Args: file_path: Path to the file. content_len: Limit of content length. If None, then entire file content is returned; otherwise only the first content_len bytes of the content are returned. mode: Mode of opening the file. offset: Position to start reading from. Returns: Content of the specified file. """ with open(file_path, mode) as file: if content_len and not offset: content = file.read(content_len) elif content_len and offset: file.seek(offset, 0) content = file.read(content_len) elif offset and not content_len: file.seek(offset, 0) content = file.read() else: content = file.read() return content