forked from TrueCloudLab/frostfs-testcases
56 lines
2.3 KiB
Python
56 lines
2.3 KiB
Python
import json
|
|
import time
|
|
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
|
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
|
|
from frostfs_testlib.shell.interfaces import Shell
|
|
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.dataclasses import ape
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.utils import datetime_utils
|
|
|
|
from .container_request import ContainerRequest
|
|
|
|
|
|
def create_container_with_ape(
|
|
frostfs_cli: FrostfsCli, wallet: WalletInfo, shell: Shell, cluster: Cluster, endpoint: str, container_request: ContainerRequest
|
|
):
|
|
with reporter.step("Create container"):
|
|
cid = _create_container_by_spec(wallet, shell, cluster, endpoint, container_request)
|
|
|
|
with reporter.step("Apply APE rules for container"):
|
|
if container_request.ape_rules:
|
|
_apply_ape_rules(frostfs_cli, endpoint, cid, container_request.ape_rules)
|
|
|
|
with reporter.step("Wait for one block"):
|
|
time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))
|
|
|
|
with reporter.step("Search nodes holding the container"):
|
|
container_holder_nodes = search_nodes_with_container(wallet, cid, shell, cluster.default_rpc_endpoint, cluster)
|
|
report_data = {node.id: node.host_ip for node in container_holder_nodes}
|
|
|
|
reporter.attach(json.dumps(report_data, indent=2), "container_nodes.json")
|
|
|
|
return cid
|
|
|
|
|
|
@reporter.step("Create container by spec {container_request}")
|
|
def _create_container_by_spec(
|
|
wallet: WalletInfo, shell: Shell, cluster: Cluster, endpoint: str, container_request: ContainerRequest
|
|
) -> str:
|
|
return create_container(wallet, shell, endpoint, container_request.parsed_rule(cluster), wait_for_creation=False)
|
|
|
|
|
|
def _apply_ape_rules(frostfs_cli: FrostfsCli, endpoint: str, cid: str, ape_rules: list[ape.Rule]):
|
|
for ape_rule in ape_rules:
|
|
rule_str = ape_rule.as_string()
|
|
with reporter.step(f"Apply APE rule '{rule_str}' for container {cid}"):
|
|
frostfs_cli.ape_manager.add(
|
|
endpoint,
|
|
ape_rule.chain_id,
|
|
target_name=cid,
|
|
target_type="container",
|
|
rule=rule_str,
|
|
)
|