forked from TrueCloudLab/frostfs-testlib
Compare commits
19 commits
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
c40b637768 | |||
1880f96277 | |||
c1e5dd1007 | |||
f4d71b664d | |||
da1a4d0099 | |||
3e36defb90 | |||
6810765d46 | |||
2cffff3ffe | |||
d9f4e88f94 | |||
deb2f12bec | |||
f236c1b083 | |||
cc13a43bec | |||
a74d1bff4f | |||
547f6106ec | |||
c2aa41e5dc | |||
8e446ccb96 | |||
9c9fb7878a | |||
3a799afdcf | |||
b610e04a7b |
19 changed files with 134 additions and 236 deletions
|
@ -362,7 +362,6 @@ class FrostfsCliObject(CliCommand):
|
||||||
trace: bool = False,
|
trace: bool = False,
|
||||||
root: bool = False,
|
root: bool = False,
|
||||||
verify_presence_all: bool = False,
|
verify_presence_all: bool = False,
|
||||||
json: bool = False,
|
|
||||||
ttl: Optional[int] = None,
|
ttl: Optional[int] = None,
|
||||||
xhdr: Optional[dict] = None,
|
xhdr: Optional[dict] = None,
|
||||||
timeout: Optional[str] = None,
|
timeout: Optional[str] = None,
|
||||||
|
|
|
@ -15,8 +15,6 @@ class NetmapParser:
|
||||||
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
|
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
|
||||||
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
|
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
|
||||||
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
|
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
|
||||||
"maximum_count_of_data_shards": r"Maximum count of data shards: (?P<maximum_count_of_data_shards>\d+)",
|
|
||||||
"maximum_count_of_parity_shards": r"Maximum count of parity shards: (?P<maximum_count_of_parity_shards>\d+)",
|
|
||||||
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
|
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
|
||||||
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
|
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
|
||||||
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",
|
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",
|
||||||
|
|
|
@ -25,16 +25,6 @@ def convert_time_to_seconds(time: int | str | None) -> int:
|
||||||
return seconds
|
return seconds
|
||||||
|
|
||||||
|
|
||||||
def force_list(input: str | list[str]):
|
|
||||||
if input is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if isinstance(input, list):
|
|
||||||
return list(map(str.strip, input))
|
|
||||||
|
|
||||||
return [input.strip()]
|
|
||||||
|
|
||||||
|
|
||||||
class LoadType(Enum):
|
class LoadType(Enum):
|
||||||
gRPC = "grpc"
|
gRPC = "grpc"
|
||||||
S3 = "s3"
|
S3 = "s3"
|
||||||
|
@ -152,29 +142,8 @@ class K6ProcessAllocationStrategy(Enum):
|
||||||
PER_ENDPOINT = "PER_ENDPOINT"
|
PER_ENDPOINT = "PER_ENDPOINT"
|
||||||
|
|
||||||
|
|
||||||
class MetaConfig:
|
|
||||||
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
|
||||||
data_fields = fields(self)
|
|
||||||
formatters = [
|
|
||||||
field.metadata["formatter"]
|
|
||||||
for field in data_fields
|
|
||||||
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
|
||||||
]
|
|
||||||
if formatters:
|
|
||||||
return formatters[0]
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def __setattr__(self, field_name, value):
|
|
||||||
formatter = self._get_field_formatter(field_name)
|
|
||||||
if formatter:
|
|
||||||
value = formatter(value)
|
|
||||||
|
|
||||||
super().__setattr__(field_name, value)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Preset(MetaConfig):
|
class Preset:
|
||||||
# ------ COMMON ------
|
# ------ COMMON ------
|
||||||
# Amount of objects which should be created
|
# Amount of objects which should be created
|
||||||
objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False)
|
objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False)
|
||||||
|
@ -189,13 +158,13 @@ class Preset(MetaConfig):
|
||||||
# Amount of containers which should be created
|
# Amount of containers which should be created
|
||||||
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
|
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
|
||||||
# Container placement policy for containers for gRPC
|
# Container placement policy for containers for gRPC
|
||||||
container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list)
|
container_placement_policy: Optional[str] = metadata_field(grpc_preset_scenarios, "policy", None, False)
|
||||||
|
|
||||||
# ------ S3 ------
|
# ------ S3 ------
|
||||||
# Amount of buckets which should be created
|
# Amount of buckets which should be created
|
||||||
buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False)
|
buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False)
|
||||||
# S3 region (AKA placement policy for S3 buckets)
|
# S3 region (AKA placement policy for S3 buckets)
|
||||||
s3_location: Optional[list[str]] = metadata_field(s3_preset_scenarios, "location", None, False, formatter=force_list)
|
s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False)
|
||||||
|
|
||||||
# Delay between containers creation and object upload for preset
|
# Delay between containers creation and object upload for preset
|
||||||
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
|
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
|
||||||
|
@ -208,7 +177,7 @@ class Preset(MetaConfig):
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class PrometheusParams(MetaConfig):
|
class PrometheusParams:
|
||||||
# Prometheus server URL
|
# Prometheus server URL
|
||||||
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
|
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
|
||||||
# Prometheus trend stats
|
# Prometheus trend stats
|
||||||
|
@ -218,7 +187,7 @@ class PrometheusParams(MetaConfig):
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class LoadParams(MetaConfig):
|
class LoadParams:
|
||||||
# ------- CONTROL PARAMS -------
|
# ------- CONTROL PARAMS -------
|
||||||
# Load type can be gRPC, HTTP, S3.
|
# Load type can be gRPC, HTTP, S3.
|
||||||
load_type: LoadType
|
load_type: LoadType
|
||||||
|
@ -443,11 +412,6 @@ class LoadParams(MetaConfig):
|
||||||
# For preset calls, bool values are passed with just --<argument_name> if the value is True
|
# For preset calls, bool values are passed with just --<argument_name> if the value is True
|
||||||
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
|
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
|
||||||
|
|
||||||
if isinstance(meta_field.value, list):
|
|
||||||
return (
|
|
||||||
" ".join(f"--{meta_field.metadata['preset_argument']} '{value}'" for value in meta_field.value) if meta_field.value else ""
|
|
||||||
)
|
|
||||||
|
|
||||||
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
|
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -467,6 +431,25 @@ class LoadParams(MetaConfig):
|
||||||
|
|
||||||
return fields_with_data or []
|
return fields_with_data or []
|
||||||
|
|
||||||
|
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
||||||
|
data_fields = fields(self)
|
||||||
|
formatters = [
|
||||||
|
field.metadata["formatter"]
|
||||||
|
for field in data_fields
|
||||||
|
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
||||||
|
]
|
||||||
|
if formatters:
|
||||||
|
return formatters[0]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __setattr__(self, field_name, value):
|
||||||
|
formatter = self._get_field_formatter(field_name)
|
||||||
|
if formatter:
|
||||||
|
value = formatter(value)
|
||||||
|
|
||||||
|
super().__setattr__(field_name, value)
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
load_type_str = self.scenario.value if self.scenario else self.load_type.value
|
load_type_str = self.scenario.value if self.scenario else self.load_type.value
|
||||||
# TODO: migrate load_params defaults to testlib
|
# TODO: migrate load_params defaults to testlib
|
||||||
|
|
|
@ -4,6 +4,6 @@ ALL_USERS_GROUP_READ_GRANT = {"Grantee": {"Type": "Group", "URI": ALL_USERS_GROU
|
||||||
CANONICAL_USER_FULL_CONTROL_GRANT = {"Grantee": {"Type": "CanonicalUser"}, "Permission": "FULL_CONTROL"}
|
CANONICAL_USER_FULL_CONTROL_GRANT = {"Grantee": {"Type": "CanonicalUser"}, "Permission": "FULL_CONTROL"}
|
||||||
|
|
||||||
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
|
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
|
||||||
PRIVATE_GRANTS = []
|
PRIVATE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT]
|
||||||
PUBLIC_READ_GRANTS = [ALL_USERS_GROUP_READ_GRANT]
|
PUBLIC_READ_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
||||||
PUBLIC_READ_WRITE_GRANTS = [ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
PUBLIC_READ_WRITE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import Literal, Optional, Union
|
from typing import Literal, Optional, Union
|
||||||
|
@ -10,7 +11,6 @@ from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, R
|
||||||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||||
from frostfs_testlib.shell import CommandOptions
|
from frostfs_testlib.shell import CommandOptions
|
||||||
from frostfs_testlib.shell.local_shell import LocalShell
|
from frostfs_testlib.shell.local_shell import LocalShell
|
||||||
from frostfs_testlib.utils import string_utils
|
|
||||||
|
|
||||||
# TODO: Refactor this code to use shell instead of _cmd_run
|
# TODO: Refactor this code to use shell instead of _cmd_run
|
||||||
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
|
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
|
||||||
|
@ -68,7 +68,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
location_constraint: Optional[str] = None,
|
location_constraint: Optional[str] = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
if bucket is None:
|
if bucket is None:
|
||||||
bucket = string_utils.unique_name("bucket-")
|
bucket = str(uuid.uuid4())
|
||||||
|
|
||||||
if object_lock_enabled_for_bucket is None:
|
if object_lock_enabled_for_bucket is None:
|
||||||
object_lock = ""
|
object_lock = ""
|
||||||
|
@ -91,6 +91,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
if location_constraint:
|
if location_constraint:
|
||||||
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
|
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
|
||||||
self.local_shell.exec(cmd)
|
self.local_shell.exec(cmd)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
|
|
||||||
return bucket
|
return bucket
|
||||||
|
|
||||||
|
@ -105,6 +106,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
def delete_bucket(self, bucket: str) -> None:
|
def delete_bucket(self, bucket: str) -> None:
|
||||||
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
self.local_shell.exec(cmd, command_options)
|
self.local_shell.exec(cmd, command_options)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
|
|
||||||
@reporter.step("Head bucket S3")
|
@reporter.step("Head bucket S3")
|
||||||
def head_bucket(self, bucket: str) -> None:
|
def head_bucket(self, bucket: str) -> None:
|
||||||
|
@ -229,7 +231,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
if bucket is None:
|
if bucket is None:
|
||||||
bucket = source_bucket
|
bucket = source_bucket
|
||||||
if key is None:
|
if key is None:
|
||||||
key = string_utils.unique_name("copy-object-")
|
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
|
||||||
copy_source = f"{source_bucket}/{source_key}"
|
copy_source = f"{source_bucket}/{source_key}"
|
||||||
|
|
||||||
cmd = (
|
cmd = (
|
||||||
|
@ -315,7 +317,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
object_range: Optional[tuple[int, int]] = None,
|
object_range: Optional[tuple[int, int]] = None,
|
||||||
full_output: bool = False,
|
full_output: bool = False,
|
||||||
) -> dict | TestFile:
|
) -> dict | TestFile:
|
||||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
|
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
|
||||||
version = f" --version-id {version_id}" if version_id else ""
|
version = f" --version-id {version_id}" if version_id else ""
|
||||||
cmd = (
|
cmd = (
|
||||||
f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} "
|
f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} "
|
||||||
|
@ -395,6 +397,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete object S3")
|
@reporter.step("Delete object S3")
|
||||||
|
@ -405,6 +408,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return self._to_json(output)
|
return self._to_json(output)
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3")
|
@reporter.step("Delete object versions S3")
|
||||||
|
@ -431,6 +435,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd, command_options).stdout
|
output = self.local_shell.exec(cmd, command_options).stdout
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return self._to_json(output)
|
return self._to_json(output)
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3 without delete markers")
|
@reporter.step("Delete object versions S3 without delete markers")
|
||||||
|
@ -1219,40 +1224,3 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
response = self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Adds one or more tags to an IAM user")
|
|
||||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
|
||||||
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
|
||||||
cmd = (
|
|
||||||
f"aws {self.common_flags} iam tag-user --user-name {user_name} --tags '{json.dumps(tags_json)}' --endpoint {self.iam_endpoint}"
|
|
||||||
)
|
|
||||||
if self.profile:
|
|
||||||
cmd += f" --profile {self.profile}"
|
|
||||||
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
|
||||||
response = self._to_json(output)
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
@reporter.step("List tags of IAM user")
|
|
||||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
|
||||||
cmd = f"aws {self.common_flags} iam list-user-tags --user-name {user_name} --endpoint {self.iam_endpoint}"
|
|
||||||
if self.profile:
|
|
||||||
cmd += f" --profile {self.profile}"
|
|
||||||
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
|
||||||
response = self._to_json(output)
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
@reporter.step("Removes the specified tags from the user")
|
|
||||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
|
||||||
tag_keys_joined = " ".join(tag_keys)
|
|
||||||
cmd = f"aws {self.common_flags} iam untag-user --user-name {user_name} --tag-keys {tag_keys_joined} --endpoint {self.iam_endpoint}"
|
|
||||||
if self.profile:
|
|
||||||
cmd += f" --profile {self.profile}"
|
|
||||||
|
|
||||||
output = self.local_shell.exec(cmd).stdout
|
|
||||||
response = self._to_json(output)
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
@ -15,7 +16,6 @@ from mypy_boto3_s3 import S3Client
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||||
from frostfs_testlib.utils import string_utils
|
|
||||||
|
|
||||||
# TODO: Refactor this code to use shell instead of _cmd_run
|
# TODO: Refactor this code to use shell instead of _cmd_run
|
||||||
from frostfs_testlib.utils.cli_utils import _configure_aws_cli, log_command_execution
|
from frostfs_testlib.utils.cli_utils import _configure_aws_cli, log_command_execution
|
||||||
|
@ -115,7 +115,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
location_constraint: Optional[str] = None,
|
location_constraint: Optional[str] = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
if bucket is None:
|
if bucket is None:
|
||||||
bucket = string_utils.unique_name("bucket-")
|
bucket = str(uuid.uuid4())
|
||||||
|
|
||||||
params = {"Bucket": bucket}
|
params = {"Bucket": bucket}
|
||||||
if object_lock_enabled_for_bucket is not None:
|
if object_lock_enabled_for_bucket is not None:
|
||||||
|
@ -134,6 +134,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
|
|
||||||
s3_bucket = self.boto3_client.create_bucket(**params)
|
s3_bucket = self.boto3_client.create_bucket(**params)
|
||||||
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return bucket
|
return bucket
|
||||||
|
|
||||||
@reporter.step("List buckets S3")
|
@reporter.step("List buckets S3")
|
||||||
|
@ -154,6 +155,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
def delete_bucket(self, bucket: str) -> None:
|
def delete_bucket(self, bucket: str) -> None:
|
||||||
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
||||||
log_command_execution("S3 Delete bucket result", response)
|
log_command_execution("S3 Delete bucket result", response)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
|
|
||||||
@reporter.step("Head bucket S3")
|
@reporter.step("Head bucket S3")
|
||||||
@report_error
|
@report_error
|
||||||
|
@ -362,6 +364,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
||||||
response = self.boto3_client.delete_object(**params)
|
response = self.boto3_client.delete_object(**params)
|
||||||
log_command_execution("S3 Delete object result", response)
|
log_command_execution("S3 Delete object result", response)
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete objects S3")
|
@reporter.step("Delete objects S3")
|
||||||
|
@ -372,6 +375,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
assert (
|
assert (
|
||||||
"Errors" not in response
|
"Errors" not in response
|
||||||
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
||||||
|
sleep(S3_SYNC_WAIT_TIME)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Delete object versions S3")
|
@reporter.step("Delete object versions S3")
|
||||||
|
@ -409,10 +413,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
grant_write: Optional[str] = None,
|
grant_write: Optional[str] = None,
|
||||||
grant_read: Optional[str] = None,
|
grant_read: Optional[str] = None,
|
||||||
) -> list:
|
) -> list:
|
||||||
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
# pytest.skip("Method put_object_acl is not supported by boto3 client")
|
||||||
response = self.boto3_client.put_object_acl(**params)
|
raise NotImplementedError("Unsupported for boto3 client")
|
||||||
log_command_execution("S3 put object ACL", response)
|
|
||||||
return response.get("Grants")
|
|
||||||
|
|
||||||
@reporter.step("Get object ACL")
|
@reporter.step("Get object ACL")
|
||||||
@report_error
|
@report_error
|
||||||
|
@ -439,7 +441,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
if bucket is None:
|
if bucket is None:
|
||||||
bucket = source_bucket
|
bucket = source_bucket
|
||||||
if key is None:
|
if key is None:
|
||||||
key = string_utils.unique_name("copy-object-")
|
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
|
||||||
copy_source = f"{source_bucket}/{source_key}"
|
copy_source = f"{source_bucket}/{source_key}"
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
|
@ -476,7 +478,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
if full_output:
|
if full_output:
|
||||||
return response
|
return response
|
||||||
|
|
||||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
|
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
|
||||||
with open(test_file, "wb") as file:
|
with open(test_file, "wb") as file:
|
||||||
chunk = response["Body"].read(1024)
|
chunk = response["Body"].read(1024)
|
||||||
while chunk:
|
while chunk:
|
||||||
|
@ -901,19 +903,3 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
|
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
|
||||||
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/")
|
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@reporter.step("Adds one or more tags to an IAM user")
|
|
||||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
|
||||||
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
|
||||||
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json)
|
|
||||||
return response
|
|
||||||
|
|
||||||
@reporter.step("List tags of IAM user")
|
|
||||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
|
||||||
response = self.boto3_iam_client.list_user_tags(UserName=user_name)
|
|
||||||
return response
|
|
||||||
|
|
||||||
@reporter.step("Removes the specified tags from the user")
|
|
||||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
|
||||||
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
|
|
||||||
return response
|
|
||||||
|
|
|
@ -550,15 +550,3 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
|
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
|
||||||
"""Updates the name and/or the path of the specified IAM user"""
|
"""Updates the name and/or the path of the specified IAM user"""
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
|
||||||
"""Adds one or more tags to an IAM user"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
|
||||||
"""List tags of IAM user"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
|
||||||
"""Removes the specified tags from the user"""
|
|
||||||
|
|
|
@ -616,11 +616,6 @@ def head_object(
|
||||||
fst_line_idx = result.stdout.find("\n")
|
fst_line_idx = result.stdout.find("\n")
|
||||||
decoded = json.loads(result.stdout[fst_line_idx:])
|
decoded = json.loads(result.stdout[fst_line_idx:])
|
||||||
|
|
||||||
# if response
|
|
||||||
if "chunks" in decoded.keys():
|
|
||||||
logger.info("decoding ec chunks")
|
|
||||||
return decoded["chunks"]
|
|
||||||
|
|
||||||
# If response is Complex Object header, it has `splitId` key
|
# If response is Complex Object header, it has `splitId` key
|
||||||
if "splitId" in decoded.keys():
|
if "splitId" in decoded.keys():
|
||||||
logger.info("decoding split header")
|
logger.info("decoding split header")
|
||||||
|
@ -722,27 +717,21 @@ def get_object_nodes(
|
||||||
|
|
||||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
|
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
|
||||||
|
|
||||||
response = cli.object.nodes(
|
result_object_nodes = cli.object.nodes(
|
||||||
rpc_endpoint=endpoint,
|
rpc_endpoint=endpoint,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
oid=oid,
|
oid=oid,
|
||||||
bearer=bearer,
|
bearer=bearer,
|
||||||
ttl=1 if is_direct else None,
|
ttl=1 if is_direct else None,
|
||||||
json=True,
|
|
||||||
xhdr=xhdr,
|
xhdr=xhdr,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
verify_presence_all=verify_presence_all,
|
verify_presence_all=verify_presence_all,
|
||||||
)
|
)
|
||||||
|
|
||||||
response_json = json.loads(response.stdout)
|
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
|
||||||
# Currently, the command will show expected and confirmed nodes.
|
list_object_nodes = [
|
||||||
# And we (currently) count only nodes which are both expected and confirmed
|
node for node in parsing_output if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
|
||||||
object_nodes_id = {
|
]
|
||||||
required_node
|
|
||||||
for data_object in response_json["data_objects"]
|
|
||||||
for required_node in data_object["required_nodes"]
|
|
||||||
if required_node in data_object["confirmed_nodes"]
|
|
||||||
}
|
|
||||||
|
|
||||||
netmap_nodes_list = parse_netmap_output(
|
netmap_nodes_list = parse_netmap_output(
|
||||||
cli.netmap.snapshot(
|
cli.netmap.snapshot(
|
||||||
|
@ -751,11 +740,14 @@ def get_object_nodes(
|
||||||
).stdout
|
).stdout
|
||||||
)
|
)
|
||||||
netmap_nodes = [
|
netmap_nodes = [
|
||||||
netmap_node for object_node in object_nodes_id for netmap_node in netmap_nodes_list if object_node == netmap_node.node_id
|
netmap_node
|
||||||
|
for object_node in list_object_nodes
|
||||||
|
for netmap_node in netmap_nodes_list
|
||||||
|
if object_node["node_id"] == netmap_node.node_id
|
||||||
]
|
]
|
||||||
|
|
||||||
object_nodes = [
|
result = [
|
||||||
cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
|
cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
|
||||||
]
|
]
|
||||||
|
|
||||||
return object_nodes
|
return result
|
||||||
|
|
|
@ -49,7 +49,7 @@ def get_via_http_gate(
|
||||||
else:
|
else:
|
||||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||||
|
|
||||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
resp = requests.get(request, headers={"Host": node.storage_node.get_http_hostname()[0]}, stream=True, timeout=timeout, verify=False)
|
||||||
|
|
||||||
if not resp.ok:
|
if not resp.ok:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
|
@ -115,6 +115,7 @@ def get_via_http_gate_by_attribute(
|
||||||
cid: CID to get object from
|
cid: CID to get object from
|
||||||
attribute: attribute {name: attribute} value pair
|
attribute: attribute {name: attribute} value pair
|
||||||
endpoint: http gate endpoint
|
endpoint: http gate endpoint
|
||||||
|
http_hostname: http host name on the node
|
||||||
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
|
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
|
||||||
"""
|
"""
|
||||||
attr_name = list(attribute.keys())[0]
|
attr_name = list(attribute.keys())[0]
|
||||||
|
@ -125,7 +126,7 @@ def get_via_http_gate_by_attribute(
|
||||||
else:
|
else:
|
||||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||||
|
|
||||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
resp = requests.get(request, stream=True, timeout=timeout, verify=False, headers={"Host": node.storage_node.get_http_hostname()[0]})
|
||||||
|
|
||||||
if not resp.ok:
|
if not resp.ok:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
|
@ -145,6 +146,7 @@ def get_via_http_gate_by_attribute(
|
||||||
return test_file
|
return test_file
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: pass http_hostname as a header
|
||||||
@reporter.step("Upload via HTTP Gate")
|
@reporter.step("Upload via HTTP Gate")
|
||||||
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300) -> str:
|
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300) -> str:
|
||||||
"""
|
"""
|
||||||
|
@ -189,6 +191,7 @@ def is_object_large(filepath: str) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: pass http_hostname as a header
|
||||||
@reporter.step("Upload via HTTP Gate using Curl")
|
@reporter.step("Upload via HTTP Gate using Curl")
|
||||||
def upload_via_http_gate_curl(
|
def upload_via_http_gate_curl(
|
||||||
cid: str,
|
cid: str,
|
||||||
|
@ -249,7 +252,7 @@ def get_via_http_curl(cid: str, oid: str, node: ClusterNode) -> TestFile:
|
||||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}"))
|
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}"))
|
||||||
|
|
||||||
curl = GenericCli("curl", node.host)
|
curl = GenericCli("curl", node.host)
|
||||||
curl(f"-k ", f"{request} > {test_file}", shell=local_shell)
|
curl(f'-k -H "Host: {node.storage_node.get_http_hostname()[0]}"', f"{request} > {test_file}", shell=local_shell)
|
||||||
|
|
||||||
return test_file
|
return test_file
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,7 @@ def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: Versi
|
||||||
if status == VersioningStatus.UNDEFINED:
|
if status == VersioningStatus.UNDEFINED:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
s3_client.get_bucket_versioning_status(bucket)
|
||||||
s3_client.put_bucket_versioning(bucket, status=status)
|
s3_client.put_bucket_versioning(bucket, status=status)
|
||||||
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
||||||
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
||||||
|
|
|
@ -144,16 +144,30 @@ class ClusterNode:
|
||||||
return self.host.config.interfaces[interface.value]
|
return self.host.config.interfaces[interface.value]
|
||||||
|
|
||||||
def get_data_interfaces(self) -> list[str]:
|
def get_data_interfaces(self) -> list[str]:
|
||||||
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface]
|
return [
|
||||||
|
ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface
|
||||||
|
]
|
||||||
|
|
||||||
def get_data_interface(self, search_interface: str) -> list[str]:
|
def get_data_interface(self, search_interface: str) -> list[str]:
|
||||||
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_interface == interface]
|
return [
|
||||||
|
self.host.config.interfaces[interface]
|
||||||
|
for interface in self.host.config.interfaces.keys()
|
||||||
|
if search_interface == interface
|
||||||
|
]
|
||||||
|
|
||||||
def get_internal_interfaces(self) -> list[str]:
|
def get_internal_interfaces(self) -> list[str]:
|
||||||
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "internal" in name_interface]
|
return [
|
||||||
|
ip_address
|
||||||
|
for name_interface, ip_address in self.host.config.interfaces.items()
|
||||||
|
if "internal" in name_interface
|
||||||
|
]
|
||||||
|
|
||||||
def get_internal_interface(self, search_internal: str) -> list[str]:
|
def get_internal_interface(self, search_internal: str) -> list[str]:
|
||||||
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_internal == interface]
|
return [
|
||||||
|
self.host.config.interfaces[interface]
|
||||||
|
for interface in self.host.config.interfaces.keys()
|
||||||
|
if search_internal == interface
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class Cluster:
|
class Cluster:
|
||||||
|
@ -164,6 +178,8 @@ class Cluster:
|
||||||
default_rpc_endpoint: str
|
default_rpc_endpoint: str
|
||||||
default_s3_gate_endpoint: str
|
default_s3_gate_endpoint: str
|
||||||
default_http_gate_endpoint: str
|
default_http_gate_endpoint: str
|
||||||
|
default_http_hostname: str
|
||||||
|
default_s3_hostname: str
|
||||||
|
|
||||||
def __init__(self, hosting: Hosting) -> None:
|
def __init__(self, hosting: Hosting) -> None:
|
||||||
self._hosting = hosting
|
self._hosting = hosting
|
||||||
|
@ -172,6 +188,8 @@ class Cluster:
|
||||||
self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint()
|
self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint()
|
||||||
self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint()
|
self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint()
|
||||||
self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint()
|
self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint()
|
||||||
|
self.default_http_hostname = self.services(StorageNode)[0].get_http_hostname()
|
||||||
|
self.default_s3_hostname = self.services(StorageNode)[0].get_s3_hostname()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hosts(self) -> list[Host]:
|
def hosts(self) -> list[Host]:
|
||||||
|
|
|
@ -16,3 +16,5 @@ class ConfigAttributes:
|
||||||
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
|
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
|
||||||
CONTROL_ENDPOINT = "control_endpoint"
|
CONTROL_ENDPOINT = "control_endpoint"
|
||||||
UN_LOCODE = "un_locode"
|
UN_LOCODE = "un_locode"
|
||||||
|
HTTP_HOSTNAME = "http_hostname"
|
||||||
|
S3_HOSTNAME = "s3_hostname"
|
||||||
|
|
|
@ -440,11 +440,9 @@ class ClusterStateController:
|
||||||
self.await_node_status(status, wallet, cluster_node)
|
self.await_node_status(status, wallet, cluster_node)
|
||||||
|
|
||||||
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
||||||
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None):
|
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode):
|
||||||
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
||||||
if not checker_node:
|
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout)
|
||||||
checker_node = cluster_node
|
|
||||||
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
|
||||||
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
||||||
if status == NodeStatus.OFFLINE:
|
if status == NodeStatus.OFFLINE:
|
||||||
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
||||||
|
|
|
@ -154,6 +154,15 @@ class StorageNode(NodeBase):
|
||||||
def get_data_directory(self) -> str:
|
def get_data_directory(self) -> str:
|
||||||
return self.host.get_data_directory(self.name)
|
return self.host.get_data_directory(self.name)
|
||||||
|
|
||||||
|
def get_storage_config(self) -> str:
|
||||||
|
return self.host.get_storage_config(self.name)
|
||||||
|
|
||||||
|
def get_http_hostname(self) -> list[str]:
|
||||||
|
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
|
||||||
|
|
||||||
|
def get_s3_hostname(self) -> list[str]:
|
||||||
|
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
|
||||||
|
|
||||||
def delete_blobovnicza(self):
|
def delete_blobovnicza(self):
|
||||||
self.host.delete_blobovnicza(self.name)
|
self.host.delete_blobovnicza(self.name)
|
||||||
|
|
||||||
|
|
|
@ -70,8 +70,6 @@ class NodeNetInfo:
|
||||||
epoch_duration: str = None
|
epoch_duration: str = None
|
||||||
inner_ring_candidate_fee: str = None
|
inner_ring_candidate_fee: str = None
|
||||||
maximum_object_size: str = None
|
maximum_object_size: str = None
|
||||||
maximum_count_of_data_shards: str = None
|
|
||||||
maximum_count_of_parity_shards: str = None
|
|
||||||
withdrawal_fee: str = None
|
withdrawal_fee: str = None
|
||||||
homomorphic_hashing_disabled: str = None
|
homomorphic_hashing_disabled: str = None
|
||||||
maintenance_mode_allowed: str = None
|
maintenance_mode_allowed: str = None
|
||||||
|
|
|
@ -2,8 +2,6 @@ import itertools
|
||||||
from concurrent.futures import Future, ThreadPoolExecutor
|
from concurrent.futures import Future, ThreadPoolExecutor
|
||||||
from typing import Callable, Collection, Optional, Union
|
from typing import Callable, Collection, Optional, Union
|
||||||
|
|
||||||
MAX_WORKERS = 50
|
|
||||||
|
|
||||||
|
|
||||||
def parallel(
|
def parallel(
|
||||||
fn: Union[Callable, list[Callable]],
|
fn: Union[Callable, list[Callable]],
|
||||||
|
@ -56,7 +54,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
||||||
|
|
||||||
futures: list[Future] = []
|
futures: list[Future] = []
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
|
with ThreadPoolExecutor(max_workers=len(fn_list)) as executor:
|
||||||
for fn in fn_list:
|
for fn in fn_list:
|
||||||
task_args = _get_args(*args)
|
task_args = _get_args(*args)
|
||||||
task_kwargs = _get_kwargs(**kwargs)
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
@ -69,7 +67,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
||||||
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
||||||
futures: list[Future] = []
|
futures: list[Future] = []
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
|
with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor:
|
||||||
for item in parallel_items:
|
for item in parallel_items:
|
||||||
task_args = _get_args(*args)
|
task_args = _get_args(*args)
|
||||||
task_kwargs = _get_kwargs(**kwargs)
|
task_kwargs = _get_kwargs(**kwargs)
|
||||||
|
|
|
@ -6,7 +6,6 @@ from typing import Any, Optional
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR
|
from frostfs_testlib.resources.common import ASSETS_DIR
|
||||||
from frostfs_testlib.utils import string_utils
|
|
||||||
|
|
||||||
logger = logging.getLogger("NeoLogger")
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
@ -42,9 +41,7 @@ def ensure_directory_opener(path, flags):
|
||||||
return os.open(path, flags)
|
return os.open(path, flags)
|
||||||
|
|
||||||
|
|
||||||
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
|
@reporter.step("Generate file with size {size}")
|
||||||
# Use object_size dt in future as argument
|
|
||||||
@reporter.step("Generate file")
|
|
||||||
def generate_file(size: int) -> TestFile:
|
def generate_file(size: int) -> TestFile:
|
||||||
"""Generates a binary file with the specified size in bytes.
|
"""Generates a binary file with the specified size in bytes.
|
||||||
|
|
||||||
|
@ -54,7 +51,7 @@ def generate_file(size: int) -> TestFile:
|
||||||
Returns:
|
Returns:
|
||||||
The path to the generated file.
|
The path to the generated file.
|
||||||
"""
|
"""
|
||||||
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
|
test_file = TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4())))
|
||||||
with open(test_file, "wb", opener=ensure_directory_opener) as file:
|
with open(test_file, "wb", opener=ensure_directory_opener) as file:
|
||||||
file.write(os.urandom(size))
|
file.write(os.urandom(size))
|
||||||
logger.info(f"File with size {size} bytes has been generated: {test_file}")
|
logger.info(f"File with size {size} bytes has been generated: {test_file}")
|
||||||
|
@ -62,9 +59,7 @@ def generate_file(size: int) -> TestFile:
|
||||||
return test_file
|
return test_file
|
||||||
|
|
||||||
|
|
||||||
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
|
@reporter.step("Generate file with content of size {size}")
|
||||||
# Use object_size dt in future as argument
|
|
||||||
@reporter.step("Generate file with content")
|
|
||||||
def generate_file_with_content(
|
def generate_file_with_content(
|
||||||
size: int,
|
size: int,
|
||||||
file_path: Optional[str | TestFile] = None,
|
file_path: Optional[str | TestFile] = None,
|
||||||
|
|
|
@ -1,26 +1,12 @@
|
||||||
import random
|
import random
|
||||||
import re
|
import re
|
||||||
import string
|
import string
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
ONLY_ASCII_LETTERS = string.ascii_letters
|
ONLY_ASCII_LETTERS = string.ascii_letters
|
||||||
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
|
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
|
||||||
NON_DIGITS_AND_LETTERS = string.punctuation
|
NON_DIGITS_AND_LETTERS = string.punctuation
|
||||||
|
|
||||||
|
|
||||||
def unique_name(prefix: str = ""):
|
|
||||||
"""
|
|
||||||
Generate unique short name of anything with prefix.
|
|
||||||
This should be unique in scope of multiple runs
|
|
||||||
|
|
||||||
Args:
|
|
||||||
prefix: prefix for unique name generation
|
|
||||||
Returns:
|
|
||||||
unique name string
|
|
||||||
"""
|
|
||||||
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}"
|
|
||||||
|
|
||||||
|
|
||||||
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):
|
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):
|
||||||
"""
|
"""
|
||||||
Generate random string from source letters list
|
Generate random string from source letters list
|
||||||
|
|
|
@ -3,7 +3,14 @@ from typing import Any, get_args
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from frostfs_testlib.load.load_config import EndpointSelectionStrategy, LoadParams, LoadScenario, LoadType, Preset, ReadFrom
|
from frostfs_testlib.load.load_config import (
|
||||||
|
EndpointSelectionStrategy,
|
||||||
|
LoadParams,
|
||||||
|
LoadScenario,
|
||||||
|
LoadType,
|
||||||
|
Preset,
|
||||||
|
ReadFrom,
|
||||||
|
)
|
||||||
from frostfs_testlib.load.runners import DefaultRunner
|
from frostfs_testlib.load.runners import DefaultRunner
|
||||||
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
|
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
|
@ -92,7 +99,9 @@ class TestLoadConfig:
|
||||||
def test_load_controller_string_representation(self, load_params: LoadParams):
|
def test_load_controller_string_representation(self, load_params: LoadParams):
|
||||||
load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL
|
load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL
|
||||||
load_params.object_size = 512
|
load_params.object_size = 512
|
||||||
background_load_controller = BackgroundLoadController("tmp", load_params, None, None, DefaultRunner(None))
|
background_load_controller = BackgroundLoadController(
|
||||||
|
"tmp", load_params, "wallet", None, None, DefaultRunner(None)
|
||||||
|
)
|
||||||
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
|
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
|
||||||
assert f"{background_load_controller}" == expected
|
assert f"{background_load_controller}" == expected
|
||||||
assert repr(background_load_controller) == expected
|
assert repr(background_load_controller) == expected
|
||||||
|
@ -132,7 +141,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--containers '16'",
|
"--containers '16'",
|
||||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
"--policy 'container_placement_policy'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--local",
|
"--local",
|
||||||
|
@ -164,7 +173,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--containers '16'",
|
"--containers '16'",
|
||||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
"--policy 'container_placement_policy'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--local",
|
"--local",
|
||||||
|
@ -205,7 +214,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--buckets '13'",
|
"--buckets '13'",
|
||||||
"--location 's3_location' --location 's3_location_2'",
|
"--location 's3_location'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--acl 'acl'",
|
"--acl 'acl'",
|
||||||
|
@ -239,7 +248,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--buckets '13'",
|
"--buckets '13'",
|
||||||
"--location 's3_location' --location 's3_location_2'",
|
"--location 's3_location'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--acl 'acl'",
|
"--acl 'acl'",
|
||||||
|
@ -279,7 +288,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--buckets '13'",
|
"--buckets '13'",
|
||||||
"--location 's3_location' --location 's3_location_2'",
|
"--location 's3_location'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--acl 'acl'",
|
"--acl 'acl'",
|
||||||
|
@ -320,7 +329,7 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--containers '16'",
|
"--containers '16'",
|
||||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
"--policy 'container_placement_policy'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--acl 'acl'",
|
"--acl 'acl'",
|
||||||
|
@ -353,13 +362,12 @@ class TestLoadConfig:
|
||||||
"--out 'pregen_json'",
|
"--out 'pregen_json'",
|
||||||
"--workers '7'",
|
"--workers '7'",
|
||||||
"--containers '16'",
|
"--containers '16'",
|
||||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
"--policy 'container_placement_policy'",
|
||||||
"--ignore-errors",
|
"--ignore-errors",
|
||||||
"--sleep '19'",
|
"--sleep '19'",
|
||||||
"--acl 'acl'",
|
"--acl 'acl'",
|
||||||
]
|
]
|
||||||
expected_env_vars = {
|
expected_env_vars = {
|
||||||
"CONFIG_DIR": "config_dir",
|
|
||||||
"CONFIG_FILE": "config_file",
|
"CONFIG_FILE": "config_file",
|
||||||
"DURATION": 9,
|
"DURATION": 9,
|
||||||
"WRITE_OBJ_SIZE": 11,
|
"WRITE_OBJ_SIZE": 11,
|
||||||
|
@ -372,49 +380,12 @@ class TestLoadConfig:
|
||||||
"DELETERS": 8,
|
"DELETERS": 8,
|
||||||
"READ_AGE": 8,
|
"READ_AGE": 8,
|
||||||
"STREAMING": 9,
|
"STREAMING": 9,
|
||||||
"MAX_TOTAL_SIZE_GB": 17,
|
|
||||||
"PREGEN_JSON": "pregen_json",
|
"PREGEN_JSON": "pregen_json",
|
||||||
}
|
}
|
||||||
|
|
||||||
self._check_preset_params(load_params, expected_preset_args)
|
self._check_preset_params(load_params, expected_preset_args)
|
||||||
self._check_env_vars(load_params, expected_env_vars)
|
self._check_env_vars(load_params, expected_env_vars)
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"input, value, params",
|
|
||||||
[
|
|
||||||
(["A C ", " B"], ["A C", "B"], [f"--policy 'A C' --policy 'B'"]),
|
|
||||||
(" A ", ["A"], ["--policy 'A'"]),
|
|
||||||
(" A , B ", ["A , B"], ["--policy 'A , B'"]),
|
|
||||||
([" A", "B "], ["A", "B"], ["--policy 'A' --policy 'B'"]),
|
|
||||||
(None, None, []),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_grpc_list_parsing_formatter(self, input, value, params):
|
|
||||||
load_params = LoadParams(LoadType.gRPC)
|
|
||||||
load_params.preset = Preset()
|
|
||||||
load_params.preset.container_placement_policy = input
|
|
||||||
assert load_params.preset.container_placement_policy == value
|
|
||||||
|
|
||||||
self._check_preset_params(load_params, params)
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"input, value, params",
|
|
||||||
[
|
|
||||||
(["A C ", " B"], ["A C", "B"], [f"--location 'A C' --location 'B'"]),
|
|
||||||
(" A ", ["A"], ["--location 'A'"]),
|
|
||||||
(" A , B ", ["A , B"], ["--location 'A , B'"]),
|
|
||||||
([" A", "B "], ["A", "B"], ["--location 'A' --location 'B'"]),
|
|
||||||
(None, None, []),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_s3_list_parsing_formatter(self, input, value, params):
|
|
||||||
load_params = LoadParams(LoadType.S3)
|
|
||||||
load_params.preset = Preset()
|
|
||||||
load_params.preset.s3_location = input
|
|
||||||
assert load_params.preset.s3_location == value
|
|
||||||
|
|
||||||
self._check_preset_params(load_params, params)
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True)
|
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True)
|
||||||
def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
|
def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
|
||||||
expected_env_vars = {
|
expected_env_vars = {
|
||||||
|
@ -621,7 +592,6 @@ class TestLoadConfig:
|
||||||
"--acl ''",
|
"--acl ''",
|
||||||
]
|
]
|
||||||
expected_env_vars = {
|
expected_env_vars = {
|
||||||
"CONFIG_DIR": "",
|
|
||||||
"CONFIG_FILE": "",
|
"CONFIG_FILE": "",
|
||||||
"DURATION": 0,
|
"DURATION": 0,
|
||||||
"WRITE_OBJ_SIZE": 0,
|
"WRITE_OBJ_SIZE": 0,
|
||||||
|
@ -629,7 +599,6 @@ class TestLoadConfig:
|
||||||
"K6_OUT": "",
|
"K6_OUT": "",
|
||||||
"K6_MIN_ITERATION_DURATION": "",
|
"K6_MIN_ITERATION_DURATION": "",
|
||||||
"K6_SETUP_TIMEOUT": "",
|
"K6_SETUP_TIMEOUT": "",
|
||||||
"MAX_TOTAL_SIZE_GB": 0,
|
|
||||||
"WRITERS": 0,
|
"WRITERS": 0,
|
||||||
"READERS": 0,
|
"READERS": 0,
|
||||||
"DELETERS": 0,
|
"DELETERS": 0,
|
||||||
|
@ -720,7 +689,9 @@ class TestLoadConfig:
|
||||||
value = getattr(dataclass, field.name)
|
value = getattr(dataclass, field.name)
|
||||||
assert value is not None, f"{field.name} is not None"
|
assert value is not None, f"{field.name} is not None"
|
||||||
|
|
||||||
def _get_filled_load_params(self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False) -> LoadParams:
|
def _get_filled_load_params(
|
||||||
|
self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False
|
||||||
|
) -> LoadParams:
|
||||||
load_type_map = {
|
load_type_map = {
|
||||||
LoadScenario.S3: LoadType.S3,
|
LoadScenario.S3: LoadType.S3,
|
||||||
LoadScenario.S3_CAR: LoadType.S3,
|
LoadScenario.S3_CAR: LoadType.S3,
|
||||||
|
@ -737,12 +708,13 @@ class TestLoadConfig:
|
||||||
|
|
||||||
meta_fields = self._get_meta_fields(load_params)
|
meta_fields = self._get_meta_fields(load_params)
|
||||||
for field in meta_fields:
|
for field in meta_fields:
|
||||||
if getattr(field.instance, field.field.name) is None and load_params.scenario in field.field.metadata["applicable_scenarios"]:
|
if (
|
||||||
|
getattr(field.instance, field.field.name) is None
|
||||||
|
and load_params.scenario in field.field.metadata["applicable_scenarios"]
|
||||||
|
):
|
||||||
value_to_set_map = {
|
value_to_set_map = {
|
||||||
int: 0 if set_emtpy else len(field.field.name),
|
int: 0 if set_emtpy else len(field.field.name),
|
||||||
float: 0 if set_emtpy else len(field.field.name),
|
|
||||||
str: "" if set_emtpy else field.field.name,
|
str: "" if set_emtpy else field.field.name,
|
||||||
list[str]: "" if set_emtpy else [field.field.name, f"{field.field.name}_2"],
|
|
||||||
bool: False if set_emtpy else True,
|
bool: False if set_emtpy else True,
|
||||||
}
|
}
|
||||||
value_to_set = value_to_set_map[field.field_type]
|
value_to_set = value_to_set_map[field.field_type]
|
||||||
|
@ -755,7 +727,11 @@ class TestLoadConfig:
|
||||||
|
|
||||||
def _get_meta_fields(self, instance):
|
def _get_meta_fields(self, instance):
|
||||||
data_fields = fields(instance)
|
data_fields = fields(instance)
|
||||||
fields_with_data = [MetaTestField(field, self._get_actual_field_type(field), instance) for field in data_fields if field.metadata]
|
fields_with_data = [
|
||||||
|
MetaTestField(field, self._get_actual_field_type(field), instance)
|
||||||
|
for field in data_fields
|
||||||
|
if field.metadata
|
||||||
|
]
|
||||||
|
|
||||||
for field in data_fields:
|
for field in data_fields:
|
||||||
actual_field_type = self._get_actual_field_type(field)
|
actual_field_type = self._get_actual_field_type(field)
|
||||||
|
|
Loading…
Reference in a new issue