forked from TrueCloudLab/frostfs-testcases
[#350] Move file-related functions to file_helper
Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
parent
ce41104d3a
commit
5eeb8b4058
6 changed files with 153 additions and 93 deletions
|
@ -2,7 +2,7 @@ import hashlib
|
|||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
import allure
|
||||
from common import ASSETS_DIR, SIMPLE_OBJ_SIZE
|
||||
|
@ -27,6 +27,38 @@ def generate_file(size: int = SIMPLE_OBJ_SIZE) -> str:
|
|||
return file_path
|
||||
|
||||
|
||||
def generate_file_with_content(
|
||||
file_path: Optional[str] = None,
|
||||
content: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Creates a new file with specified content.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file that should be created. If not specified, then random file
|
||||
path will be generated.
|
||||
content: Content that should be stored in the file. If not specified, then random binary
|
||||
content will be generated.
|
||||
|
||||
Returns:
|
||||
Path to the generated file.
|
||||
"""
|
||||
mode = "w+"
|
||||
if content is None:
|
||||
content = os.urandom(SIMPLE_OBJ_SIZE)
|
||||
mode = "wb"
|
||||
|
||||
if not file_path:
|
||||
file_path = f"{os.getcwd()}/{ASSETS_DIR}/{str(uuid.uuid4())}"
|
||||
else:
|
||||
if not os.path.exists(os.path.dirname(file_path)):
|
||||
os.makedirs(os.path.dirname(file_path))
|
||||
|
||||
with open(file_path, mode) as file:
|
||||
file.write(content)
|
||||
|
||||
return file_path
|
||||
|
||||
|
||||
@allure.step("Get File Hash")
|
||||
def get_file_hash(file_path: str, len: Optional[int] = None) -> str:
|
||||
"""Generates hash for the specified file.
|
||||
|
@ -65,3 +97,54 @@ def concat_files(file_paths: list, resulting_file_path: Optional[str] = None) ->
|
|||
with open(file, "rb") as part_file:
|
||||
f.write(part_file.read())
|
||||
return resulting_file_path
|
||||
|
||||
|
||||
def split_file(file_path: str, parts: int) -> list[str]:
|
||||
"""Splits specified file into several specified number of parts.
|
||||
|
||||
Each part is saved under name `{original_file}_part_{i}`.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file that should be split.
|
||||
parts: Number of parts the file should be split into.
|
||||
|
||||
Returns:
|
||||
Paths to the part files.
|
||||
"""
|
||||
with open(file_path, "rb") as file:
|
||||
content = file.read()
|
||||
|
||||
content_size = len(content)
|
||||
chunk_size = int((content_size + parts) / parts)
|
||||
|
||||
part_id = 1
|
||||
part_file_paths = []
|
||||
for content_offset in range(0, content_size + 1, chunk_size):
|
||||
part_file_name = f"{file_path}_part_{part_id}"
|
||||
part_file_paths.append(part_file_name)
|
||||
with open(part_file_name, "wb") as out_file:
|
||||
out_file.write(content[content_offset : content_offset + chunk_size])
|
||||
part_id += 1
|
||||
|
||||
return part_file_paths
|
||||
|
||||
|
||||
def get_file_content(file_path: str, content_len: Optional[int] = None, mode: str = "r") -> Any:
|
||||
"""Returns content of specified file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file.
|
||||
content_len: Limit of content length. If None, then entire file content is returned;
|
||||
otherwise only the first content_len bytes of the content are returned.
|
||||
mode: Mode of opening the file.
|
||||
|
||||
Returns:
|
||||
Content of the specified file.
|
||||
"""
|
||||
with open(file_path, mode) as file:
|
||||
if content_len:
|
||||
content = file.read(content_len)
|
||||
else:
|
||||
content = file.read()
|
||||
|
||||
return content
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue