[#247] Use TestFiles which automatically deletes itself

Signed-off-by: a.berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2024-06-18 13:37:07 +03:00
parent 7a482152a8
commit cb31d41f15
6 changed files with 209 additions and 346 deletions

View file

@ -10,7 +10,39 @@ from frostfs_testlib.resources.common import ASSETS_DIR
logger = logging.getLogger("NeoLogger")
def generate_file(size: int) -> str:
class TestFile(os.PathLike):
def __init__(self, path: str):
self.path = path
def __del__(self):
logger.debug(f"Removing file {self.path}")
if os.path.exists(self.path):
os.remove(self.path)
def __str__(self):
return self.path
def __repr__(self):
return self.path
def __fspath__(self):
return self.path
def ensure_directory(path):
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
def ensure_directory_opener(path, flags):
ensure_directory(path)
return os.open(path, flags)
@reporter.step("Generate file with size {size}")
def generate_file(size: int) -> TestFile:
"""Generates a binary file with the specified size in bytes.
Args:
@ -19,19 +51,20 @@ def generate_file(size: int) -> str:
Returns:
The path to the generated file.
"""
file_path = os.path.join(ASSETS_DIR, str(uuid.uuid4()))
with open(file_path, "wb") as file:
test_file = TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4())))
with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {file_path}")
logger.info(f"File with size {size} bytes has been generated: {test_file}")
return file_path
return test_file
@reporter.step("Generate file with content of size {size}")
def generate_file_with_content(
size: int,
file_path: Optional[str] = None,
file_path: Optional[str | TestFile] = None,
content: Optional[str] = None,
) -> str:
) -> TestFile:
"""Creates a new file with specified content.
Args:
@ -48,20 +81,22 @@ def generate_file_with_content(
content = os.urandom(size)
mode = "wb"
test_file = None
if not file_path:
file_path = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
elif isinstance(file_path, TestFile):
test_file = file_path
else:
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
test_file = TestFile(file_path)
with open(file_path, mode) as file:
with open(test_file, mode, opener=ensure_directory_opener) as file:
file.write(content)
return file_path
return test_file
@reporter.step("Get File Hash")
def get_file_hash(file_path: str, len: Optional[int] = None, offset: Optional[int] = None) -> str:
def get_file_hash(file_path: str | TestFile, len: Optional[int] = None, offset: Optional[int] = None) -> str:
"""Generates hash for the specified file.
Args:
@ -88,7 +123,7 @@ def get_file_hash(file_path: str, len: Optional[int] = None, offset: Optional[in
@reporter.step("Concatenation set of files to one file")
def concat_files(file_paths: list, resulting_file_path: Optional[str] = None) -> str:
def concat_files(file_paths: list[str | TestFile], resulting_file_path: Optional[str | TestFile] = None) -> TestFile:
"""Concatenates several files into a single file.
Args:
@ -98,16 +133,24 @@ def concat_files(file_paths: list, resulting_file_path: Optional[str] = None) ->
Returns:
Path to the resulting file.
"""
test_file = None
if not resulting_file_path:
resulting_file_path = os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4()))
with open(resulting_file_path, "wb") as f:
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
elif isinstance(resulting_file_path, TestFile):
test_file = resulting_file_path
else:
test_file = TestFile(resulting_file_path)
with open(test_file, "wb", opener=ensure_directory_opener) as f:
for file in file_paths:
with open(file, "rb") as part_file:
f.write(part_file.read())
return resulting_file_path
return test_file
def split_file(file_path: str, parts: int) -> list[str]:
@reporter.step("Split file to {parts} parts")
def split_file(file_path: str | TestFile, parts: int) -> list[TestFile]:
"""Splits specified file into several specified number of parts.
Each part is saved under name `{original_file}_part_{i}`.
@ -129,7 +172,7 @@ def split_file(file_path: str, parts: int) -> list[str]:
part_file_paths = []
for content_offset in range(0, content_size + 1, chunk_size):
part_file_name = f"{file_path}_part_{part_id}"
part_file_paths.append(part_file_name)
part_file_paths.append(TestFile(part_file_name))
with open(part_file_name, "wb") as out_file:
out_file.write(content[content_offset : content_offset + chunk_size])
part_id += 1
@ -137,9 +180,8 @@ def split_file(file_path: str, parts: int) -> list[str]:
return part_file_paths
def get_file_content(
file_path: str, content_len: Optional[int] = None, mode: str = "r", offset: Optional[int] = None
) -> Any:
@reporter.step("Get file content")
def get_file_content(file_path: str | TestFile, content_len: Optional[int] = None, mode: str = "r", offset: Optional[int] = None) -> Any:
"""Returns content of specified file.
Args: