Compare commits

..

19 commits

Author SHA1 Message Date
c40b637768 [#255] add filter priority to get_filtered_logs method 2024-06-29 12:45:16 +03:00
1880f96277 [#249] add metrics methods 2024-06-21 14:14:59 +03:00
c1e5dd1007 [#246] Use TestFiles which automatically deletes itself
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-18 13:38:26 +03:00
f4d71b664d [#244] Update versions check
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-07 17:06:28 +03:00
da1a4d0099 [#240] write cache metrics 2024-06-06 14:23:56 +00:00
3e36defb90 [#242] New error patterns 2024-06-06 13:04:19 +03:00
6810765d46 [#237] Update S3 acl verify method
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-05 14:49:32 +03:00
2cffff3ffe [#235] grpc metrics 2024-05-31 09:42:15 +03:00
d9f4e88f94 [#232]Change provide methods 2024-05-30 14:54:45 +00:00
deb2f12bec [#228] metrics for object 2024-05-28 11:34:45 +03:00
f236c1b083 Added delete bucket policy method to s3 client 2024-05-22 11:12:20 +03:00
cc13a43bec [#227] Restore invalid_obj check
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-05-16 09:58:04 +00:00
a74d1bff4f [#220] add container metrics 2024-05-16 08:18:23 +00:00
547f6106ec [#222] Added new control command CLI 2024-05-14 16:16:42 +03:00
c2aa41e5dc [#217] Add parameter max_total_size_gb 2024-05-06 08:16:59 +00:00
8e446ccb96 [#219] Add ns attribute for container create
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-05-06 08:16:42 +00:00
9c9fb7878a [#215] Removed x10 wait in delete bucket function 2024-04-24 15:05:14 +03:00
3a799afdcf [#211] Return response in complete_multipart_upload function 2024-04-23 23:55:12 +03:00
b610e04a7b [#208] Add await for search func
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-04-17 11:06:32 +03:00
19 changed files with 134 additions and 236 deletions

View file

@ -362,7 +362,6 @@ class FrostfsCliObject(CliCommand):
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
json: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,

View file

@ -15,8 +15,6 @@ class NetmapParser:
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
"maximum_count_of_data_shards": r"Maximum count of data shards: (?P<maximum_count_of_data_shards>\d+)",
"maximum_count_of_parity_shards": r"Maximum count of parity shards: (?P<maximum_count_of_parity_shards>\d+)",
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",

View file

@ -25,16 +25,6 @@ def convert_time_to_seconds(time: int | str | None) -> int:
return seconds
def force_list(input: str | list[str]):
if input is None:
return None
if isinstance(input, list):
return list(map(str.strip, input))
return [input.strip()]
class LoadType(Enum):
gRPC = "grpc"
S3 = "s3"
@ -152,29 +142,8 @@ class K6ProcessAllocationStrategy(Enum):
PER_ENDPOINT = "PER_ENDPOINT"
class MetaConfig:
def _get_field_formatter(self, field_name: str) -> Callable | None:
data_fields = fields(self)
formatters = [
field.metadata["formatter"]
for field in data_fields
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
]
if formatters:
return formatters[0]
return None
def __setattr__(self, field_name, value):
formatter = self._get_field_formatter(field_name)
if formatter:
value = formatter(value)
super().__setattr__(field_name, value)
@dataclass
class Preset(MetaConfig):
class Preset:
# ------ COMMON ------
# Amount of objects which should be created
objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False)
@ -189,13 +158,13 @@ class Preset(MetaConfig):
# Amount of containers which should be created
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
# Container placement policy for containers for gRPC
container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list)
container_placement_policy: Optional[str] = metadata_field(grpc_preset_scenarios, "policy", None, False)
# ------ S3 ------
# Amount of buckets which should be created
buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False)
# S3 region (AKA placement policy for S3 buckets)
s3_location: Optional[list[str]] = metadata_field(s3_preset_scenarios, "location", None, False, formatter=force_list)
s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False)
# Delay between containers creation and object upload for preset
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
@ -208,7 +177,7 @@ class Preset(MetaConfig):
@dataclass
class PrometheusParams(MetaConfig):
class PrometheusParams:
# Prometheus server URL
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
# Prometheus trend stats
@ -218,7 +187,7 @@ class PrometheusParams(MetaConfig):
@dataclass
class LoadParams(MetaConfig):
class LoadParams:
# ------- CONTROL PARAMS -------
# Load type can be gRPC, HTTP, S3.
load_type: LoadType
@ -443,11 +412,6 @@ class LoadParams(MetaConfig):
# For preset calls, bool values are passed with just --<argument_name> if the value is True
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
if isinstance(meta_field.value, list):
return (
" ".join(f"--{meta_field.metadata['preset_argument']} '{value}'" for value in meta_field.value) if meta_field.value else ""
)
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
@staticmethod
@ -467,6 +431,25 @@ class LoadParams(MetaConfig):
return fields_with_data or []
def _get_field_formatter(self, field_name: str) -> Callable | None:
data_fields = fields(self)
formatters = [
field.metadata["formatter"]
for field in data_fields
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
]
if formatters:
return formatters[0]
return None
def __setattr__(self, field_name, value):
formatter = self._get_field_formatter(field_name)
if formatter:
value = formatter(value)
super().__setattr__(field_name, value)
def __str__(self) -> str:
load_type_str = self.scenario.value if self.scenario else self.load_type.value
# TODO: migrate load_params defaults to testlib

View file

@ -4,6 +4,6 @@ ALL_USERS_GROUP_READ_GRANT = {"Grantee": {"Type": "Group", "URI": ALL_USERS_GROU
CANONICAL_USER_FULL_CONTROL_GRANT = {"Grantee": {"Type": "CanonicalUser"}, "Permission": "FULL_CONTROL"}
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
PRIVATE_GRANTS = []
PUBLIC_READ_GRANTS = [ALL_USERS_GROUP_READ_GRANT]
PUBLIC_READ_WRITE_GRANTS = [ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]
PRIVATE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT]
PUBLIC_READ_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_READ_GRANT]
PUBLIC_READ_WRITE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]

View file

@ -1,6 +1,7 @@
import json
import logging
import os
import uuid
from datetime import datetime
from time import sleep
from typing import Literal, Optional, Union
@ -10,7 +11,6 @@ from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, R
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
@ -68,7 +68,7 @@ class AwsCliClient(S3ClientWrapper):
location_constraint: Optional[str] = None,
) -> str:
if bucket is None:
bucket = string_utils.unique_name("bucket-")
bucket = str(uuid.uuid4())
if object_lock_enabled_for_bucket is None:
object_lock = ""
@ -91,6 +91,7 @@ class AwsCliClient(S3ClientWrapper):
if location_constraint:
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
self.local_shell.exec(cmd)
sleep(S3_SYNC_WAIT_TIME)
return bucket
@ -105,6 +106,7 @@ class AwsCliClient(S3ClientWrapper):
def delete_bucket(self, bucket: str) -> None:
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd, command_options)
sleep(S3_SYNC_WAIT_TIME)
@reporter.step("Head bucket S3")
def head_bucket(self, bucket: str) -> None:
@ -229,7 +231,7 @@ class AwsCliClient(S3ClientWrapper):
if bucket is None:
bucket = source_bucket
if key is None:
key = string_utils.unique_name("copy-object-")
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
copy_source = f"{source_bucket}/{source_key}"
cmd = (
@ -315,7 +317,7 @@ class AwsCliClient(S3ClientWrapper):
object_range: Optional[tuple[int, int]] = None,
full_output: bool = False,
) -> dict | TestFile:
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} "
@ -395,6 +397,7 @@ class AwsCliClient(S3ClientWrapper):
)
output = self.local_shell.exec(cmd, command_options).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME)
return response
@reporter.step("Delete object S3")
@ -405,6 +408,7 @@ class AwsCliClient(S3ClientWrapper):
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
sleep(S3_SYNC_WAIT_TIME)
return self._to_json(output)
@reporter.step("Delete object versions S3")
@ -431,6 +435,7 @@ class AwsCliClient(S3ClientWrapper):
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
output = self.local_shell.exec(cmd, command_options).stdout
sleep(S3_SYNC_WAIT_TIME)
return self._to_json(output)
@reporter.step("Delete object versions S3 without delete markers")
@ -1219,40 +1224,3 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output)
return response
@reporter.step("Adds one or more tags to an IAM user")
def iam_tag_user(self, user_name: str, tags: list) -> dict:
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
cmd = (
f"aws {self.common_flags} iam tag-user --user-name {user_name} --tags '{json.dumps(tags_json)}' --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("List tags of IAM user")
def iam_list_user_tags(self, user_name: str) -> dict:
cmd = f"aws {self.common_flags} iam list-user-tags --user-name {user_name} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Removes the specified tags from the user")
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
tag_keys_joined = " ".join(tag_keys)
cmd = f"aws {self.common_flags} iam untag-user --user-name {user_name} --tag-keys {tag_keys_joined} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response

View file

@ -1,6 +1,7 @@
import json
import logging
import os
import uuid
from datetime import datetime
from functools import wraps
from time import sleep
@ -15,7 +16,6 @@ from mypy_boto3_s3 import S3Client
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import _configure_aws_cli, log_command_execution
@ -115,7 +115,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
location_constraint: Optional[str] = None,
) -> str:
if bucket is None:
bucket = string_utils.unique_name("bucket-")
bucket = str(uuid.uuid4())
params = {"Bucket": bucket}
if object_lock_enabled_for_bucket is not None:
@ -134,6 +134,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
s3_bucket = self.boto3_client.create_bucket(**params)
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
sleep(S3_SYNC_WAIT_TIME)
return bucket
@reporter.step("List buckets S3")
@ -154,6 +155,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
def delete_bucket(self, bucket: str) -> None:
response = self.boto3_client.delete_bucket(Bucket=bucket)
log_command_execution("S3 Delete bucket result", response)
sleep(S3_SYNC_WAIT_TIME)
@reporter.step("Head bucket S3")
@report_error
@ -362,6 +364,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
response = self.boto3_client.delete_object(**params)
log_command_execution("S3 Delete object result", response)
sleep(S3_SYNC_WAIT_TIME)
return response
@reporter.step("Delete objects S3")
@ -372,6 +375,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
assert (
"Errors" not in response
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
sleep(S3_SYNC_WAIT_TIME)
return response
@reporter.step("Delete object versions S3")
@ -409,10 +413,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
grant_write: Optional[str] = None,
grant_read: Optional[str] = None,
) -> list:
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
response = self.boto3_client.put_object_acl(**params)
log_command_execution("S3 put object ACL", response)
return response.get("Grants")
# pytest.skip("Method put_object_acl is not supported by boto3 client")
raise NotImplementedError("Unsupported for boto3 client")
@reporter.step("Get object ACL")
@report_error
@ -439,7 +441,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
if bucket is None:
bucket = source_bucket
if key is None:
key = string_utils.unique_name("copy-object-")
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
copy_source = f"{source_bucket}/{source_key}"
params = {
@ -476,7 +478,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
if full_output:
return response
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
with open(test_file, "wb") as file:
chunk = response["Body"].read(1024)
while chunk:
@ -901,19 +903,3 @@ class Boto3ClientWrapper(S3ClientWrapper):
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/")
return response
@reporter.step("Adds one or more tags to an IAM user")
def iam_tag_user(self, user_name: str, tags: list) -> dict:
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json)
return response
@reporter.step("List tags of IAM user")
def iam_list_user_tags(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_tags(UserName=user_name)
return response
@reporter.step("Removes the specified tags from the user")
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
return response

View file

@ -550,15 +550,3 @@ class S3ClientWrapper(HumanReadableABC):
@abstractmethod
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
"""Updates the name and/or the path of the specified IAM user"""
@abstractmethod
def iam_tag_user(self, user_name: str, tags: list) -> dict:
"""Adds one or more tags to an IAM user"""
@abstractmethod
def iam_list_user_tags(self, user_name: str) -> dict:
"""List tags of IAM user"""
@abstractmethod
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
"""Removes the specified tags from the user"""

View file

@ -616,11 +616,6 @@ def head_object(
fst_line_idx = result.stdout.find("\n")
decoded = json.loads(result.stdout[fst_line_idx:])
# if response
if "chunks" in decoded.keys():
logger.info("decoding ec chunks")
return decoded["chunks"]
# If response is Complex Object header, it has `splitId` key
if "splitId" in decoded.keys():
logger.info("decoding split header")
@ -722,27 +717,21 @@ def get_object_nodes(
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
response = cli.object.nodes(
result_object_nodes = cli.object.nodes(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
json=True,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
response_json = json.loads(response.stdout)
# Currently, the command will show expected and confirmed nodes.
# And we (currently) count only nodes which are both expected and confirmed
object_nodes_id = {
required_node
for data_object in response_json["data_objects"]
for required_node in data_object["required_nodes"]
if required_node in data_object["confirmed_nodes"]
}
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
list_object_nodes = [
node for node in parsing_output if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
]
netmap_nodes_list = parse_netmap_output(
cli.netmap.snapshot(
@ -751,11 +740,14 @@ def get_object_nodes(
).stdout
)
netmap_nodes = [
netmap_node for object_node in object_nodes_id for netmap_node in netmap_nodes_list if object_node == netmap_node.node_id
netmap_node
for object_node in list_object_nodes
for netmap_node in netmap_nodes_list
if object_node["node_id"] == netmap_node.node_id
]
object_nodes = [
result = [
cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
]
return object_nodes
return result

View file

@ -49,7 +49,7 @@ def get_via_http_gate(
else:
request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
resp = requests.get(request, headers={"Host": node.storage_node.get_http_hostname()[0]}, stream=True, timeout=timeout, verify=False)
if not resp.ok:
raise Exception(
@ -115,6 +115,7 @@ def get_via_http_gate_by_attribute(
cid: CID to get object from
attribute: attribute {name: attribute} value pair
endpoint: http gate endpoint
http_hostname: http host name on the node
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
"""
attr_name = list(attribute.keys())[0]
@ -125,7 +126,7 @@ def get_via_http_gate_by_attribute(
else:
request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
resp = requests.get(request, stream=True, timeout=timeout, verify=False, headers={"Host": node.storage_node.get_http_hostname()[0]})
if not resp.ok:
raise Exception(
@ -145,6 +146,7 @@ def get_via_http_gate_by_attribute(
return test_file
# TODO: pass http_hostname as a header
@reporter.step("Upload via HTTP Gate")
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300) -> str:
"""
@ -189,6 +191,7 @@ def is_object_large(filepath: str) -> bool:
return False
# TODO: pass http_hostname as a header
@reporter.step("Upload via HTTP Gate using Curl")
def upload_via_http_gate_curl(
cid: str,
@ -249,7 +252,7 @@ def get_via_http_curl(cid: str, oid: str, node: ClusterNode) -> TestFile:
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}"))
curl = GenericCli("curl", node.host)
curl(f"-k ", f"{request} > {test_file}", shell=local_shell)
curl(f'-k -H "Host: {node.storage_node.get_http_hostname()[0]}"', f"{request} > {test_file}", shell=local_shell)
return test_file

View file

@ -47,6 +47,7 @@ def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: Versi
if status == VersioningStatus.UNDEFINED:
return
s3_client.get_bucket_versioning_status(bucket)
s3_client.put_bucket_versioning(bucket, status=status)
bucket_status = s3_client.get_bucket_versioning_status(bucket)
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"

View file

@ -144,16 +144,30 @@ class ClusterNode:
return self.host.config.interfaces[interface.value]
def get_data_interfaces(self) -> list[str]:
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface]
return [
ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface
]
def get_data_interface(self, search_interface: str) -> list[str]:
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_interface == interface]
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_interface == interface
]
def get_internal_interfaces(self) -> list[str]:
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "internal" in name_interface]
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "internal" in name_interface
]
def get_internal_interface(self, search_internal: str) -> list[str]:
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_internal == interface]
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_internal == interface
]
class Cluster:
@ -164,6 +178,8 @@ class Cluster:
default_rpc_endpoint: str
default_s3_gate_endpoint: str
default_http_gate_endpoint: str
default_http_hostname: str
default_s3_hostname: str
def __init__(self, hosting: Hosting) -> None:
self._hosting = hosting
@ -172,6 +188,8 @@ class Cluster:
self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint()
self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint()
self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint()
self.default_http_hostname = self.services(StorageNode)[0].get_http_hostname()
self.default_s3_hostname = self.services(StorageNode)[0].get_s3_hostname()
@property
def hosts(self) -> list[Host]:

View file

@ -16,3 +16,5 @@ class ConfigAttributes:
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint"
UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname"
S3_HOSTNAME = "s3_hostname"

View file

@ -440,11 +440,9 @@ class ClusterStateController:
self.await_node_status(status, wallet, cluster_node)
@wait_for_success(80, 8, title="Wait for node status become {status}")
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None):
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode):
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
if not checker_node:
checker_node = cluster_node
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout)
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
if status == NodeStatus.OFFLINE:
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"

View file

@ -154,6 +154,15 @@ class StorageNode(NodeBase):
def get_data_directory(self) -> str:
return self.host.get_data_directory(self.name)
def get_storage_config(self) -> str:
return self.host.get_storage_config(self.name)
def get_http_hostname(self) -> list[str]:
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
def get_s3_hostname(self) -> list[str]:
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
def delete_blobovnicza(self):
self.host.delete_blobovnicza(self.name)

View file

@ -70,8 +70,6 @@ class NodeNetInfo:
epoch_duration: str = None
inner_ring_candidate_fee: str = None
maximum_object_size: str = None
maximum_count_of_data_shards: str = None
maximum_count_of_parity_shards: str = None
withdrawal_fee: str = None
homomorphic_hashing_disabled: str = None
maintenance_mode_allowed: str = None

View file

@ -2,8 +2,6 @@ import itertools
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable, Collection, Optional, Union
MAX_WORKERS = 50
def parallel(
fn: Union[Callable, list[Callable]],
@ -56,7 +54,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
with ThreadPoolExecutor(max_workers=len(fn_list)) as executor:
for fn in fn_list:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)
@ -69,7 +67,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
futures: list[Future] = []
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor:
for item in parallel_items:
task_args = _get_args(*args)
task_kwargs = _get_kwargs(**kwargs)

View file

@ -6,7 +6,6 @@ from typing import Any, Optional
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils import string_utils
logger = logging.getLogger("NeoLogger")
@ -42,9 +41,7 @@ def ensure_directory_opener(path, flags):
return os.open(path, flags)
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument
@reporter.step("Generate file")
@reporter.step("Generate file with size {size}")
def generate_file(size: int) -> TestFile:
"""Generates a binary file with the specified size in bytes.
@ -54,7 +51,7 @@ def generate_file(size: int) -> TestFile:
Returns:
The path to the generated file.
"""
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
test_file = TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4())))
with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {test_file}")
@ -62,9 +59,7 @@ def generate_file(size: int) -> TestFile:
return test_file
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument
@reporter.step("Generate file with content")
@reporter.step("Generate file with content of size {size}")
def generate_file_with_content(
size: int,
file_path: Optional[str | TestFile] = None,

View file

@ -1,26 +1,12 @@
import random
import re
import string
from datetime import datetime
ONLY_ASCII_LETTERS = string.ascii_letters
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
NON_DIGITS_AND_LETTERS = string.punctuation
def unique_name(prefix: str = ""):
"""
Generate unique short name of anything with prefix.
This should be unique in scope of multiple runs
Args:
prefix: prefix for unique name generation
Returns:
unique name string
"""
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}"
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):
"""
Generate random string from source letters list

View file

@ -3,7 +3,14 @@ from typing import Any, get_args
import pytest
from frostfs_testlib.load.load_config import EndpointSelectionStrategy, LoadParams, LoadScenario, LoadType, Preset, ReadFrom
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
LoadParams,
LoadScenario,
LoadType,
Preset,
ReadFrom,
)
from frostfs_testlib.load.runners import DefaultRunner
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
from frostfs_testlib.storage.cluster import ClusterNode
@ -92,7 +99,9 @@ class TestLoadConfig:
def test_load_controller_string_representation(self, load_params: LoadParams):
load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL
load_params.object_size = 512
background_load_controller = BackgroundLoadController("tmp", load_params, None, None, DefaultRunner(None))
background_load_controller = BackgroundLoadController(
"tmp", load_params, "wallet", None, None, DefaultRunner(None)
)
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
assert f"{background_load_controller}" == expected
assert repr(background_load_controller) == expected
@ -132,7 +141,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
@ -164,7 +173,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
@ -205,7 +214,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location' --location 's3_location_2'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
@ -239,7 +248,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location' --location 's3_location_2'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
@ -279,7 +288,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location' --location 's3_location_2'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
@ -320,7 +329,7 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
@ -353,13 +362,12 @@ class TestLoadConfig:
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"CONFIG_DIR": "config_dir",
"CONFIG_FILE": "config_file",
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
@ -372,49 +380,12 @@ class TestLoadConfig:
"DELETERS": 8,
"READ_AGE": 8,
"STREAMING": 9,
"MAX_TOTAL_SIZE_GB": 17,
"PREGEN_JSON": "pregen_json",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize(
"input, value, params",
[
(["A C ", " B"], ["A C", "B"], [f"--policy 'A C' --policy 'B'"]),
(" A ", ["A"], ["--policy 'A'"]),
(" A , B ", ["A , B"], ["--policy 'A , B'"]),
([" A", "B "], ["A", "B"], ["--policy 'A' --policy 'B'"]),
(None, None, []),
],
)
def test_grpc_list_parsing_formatter(self, input, value, params):
load_params = LoadParams(LoadType.gRPC)
load_params.preset = Preset()
load_params.preset.container_placement_policy = input
assert load_params.preset.container_placement_policy == value
self._check_preset_params(load_params, params)
@pytest.mark.parametrize(
"input, value, params",
[
(["A C ", " B"], ["A C", "B"], [f"--location 'A C' --location 'B'"]),
(" A ", ["A"], ["--location 'A'"]),
(" A , B ", ["A , B"], ["--location 'A , B'"]),
([" A", "B "], ["A", "B"], ["--location 'A' --location 'B'"]),
(None, None, []),
],
)
def test_s3_list_parsing_formatter(self, input, value, params):
load_params = LoadParams(LoadType.S3)
load_params.preset = Preset()
load_params.preset.s3_location = input
assert load_params.preset.s3_location == value
self._check_preset_params(load_params, params)
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True)
def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
expected_env_vars = {
@ -621,7 +592,6 @@ class TestLoadConfig:
"--acl ''",
]
expected_env_vars = {
"CONFIG_DIR": "",
"CONFIG_FILE": "",
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
@ -629,7 +599,6 @@ class TestLoadConfig:
"K6_OUT": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"MAX_TOTAL_SIZE_GB": 0,
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
@ -720,7 +689,9 @@ class TestLoadConfig:
value = getattr(dataclass, field.name)
assert value is not None, f"{field.name} is not None"
def _get_filled_load_params(self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False) -> LoadParams:
def _get_filled_load_params(
self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False
) -> LoadParams:
load_type_map = {
LoadScenario.S3: LoadType.S3,
LoadScenario.S3_CAR: LoadType.S3,
@ -737,12 +708,13 @@ class TestLoadConfig:
meta_fields = self._get_meta_fields(load_params)
for field in meta_fields:
if getattr(field.instance, field.field.name) is None and load_params.scenario in field.field.metadata["applicable_scenarios"]:
if (
getattr(field.instance, field.field.name) is None
and load_params.scenario in field.field.metadata["applicable_scenarios"]
):
value_to_set_map = {
int: 0 if set_emtpy else len(field.field.name),
float: 0 if set_emtpy else len(field.field.name),
str: "" if set_emtpy else field.field.name,
list[str]: "" if set_emtpy else [field.field.name, f"{field.field.name}_2"],
bool: False if set_emtpy else True,
}
value_to_set = value_to_set_map[field.field_type]
@ -755,7 +727,11 @@ class TestLoadConfig:
def _get_meta_fields(self, instance):
data_fields = fields(instance)
fields_with_data = [MetaTestField(field, self._get_actual_field_type(field), instance) for field in data_fields if field.metadata]
fields_with_data = [
MetaTestField(field, self._get_actual_field_type(field), instance)
for field in data_fields
if field.metadata
]
for field in data_fields:
actual_field_type = self._get_actual_field_type(field)