from time import sleep import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.error_patterns import RULE_ACCESS_DENIED_CONTAINER from frostfs_testlib.shell.interfaces import Shell from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.ape import Operations from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils from ...helpers.container_request import APE_EVERYONE_ALLOW_ALL, ContainerRequest, MultipleContainersRequest REP2 = ContainerRequest("REP 2", ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="REP2_allow_all_ape") def remove_local_overrides_on_node(node: ClusterNode): target = "Chain ID" shell: Shell = node.host.get_shell() remote_config: str = node.storage_node.get_remote_wallet_config_path() cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=remote_config) with reporter.step(f"Check local overrides on {node.storage_node.id} node"): rules = cli.control.list_rules( endpoint=node.storage_node.get_control_endpoint(), target_name="root", target_type="namespace" ).stdout if target in rules: with reporter.step("Delete rules"): chain_ids = [i.split(" ")[2].strip() for i in rules.split("\n") if "Chain ID" in i] for chain_id in chain_ids: cli.control.remove_rule( endpoint=node.storage_node.get_control_endpoint(), target_type="namespace", target_name="root", chain_id=chain_id, ) with reporter.step("Wait for one block"): sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture(scope="session") def remove_local_ape_overrides(cluster: Cluster) -> None: yield with reporter.step("Check local overrides on nodes."): parallel(remove_local_overrides_on_node, cluster.cluster_nodes) @pytest.mark.ape @pytest.mark.ape_local @pytest.mark.ape_container @pytest.mark.ape_namespace class TestApeLocalOverrideContainer(ClusterTestBase): @allure.title("LocalOverride: Deny to GetContainer in root tenant") def test_local_override_deny_to_get_container_root( self, frostfs_cli_on_first_node: FrostfsCli, frostfs_cli: FrostfsCli, container: str, remove_local_ape_overrides: None, ): with reporter.step("Create a namespace rule for the first node"): frostfs_cli_on_first_node.control.add_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerGet", rule="deny Container.Get *", ) with reporter.step("Check get the container property on the first node, expected denied error"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.GET_CONTAINER)): frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), container) with reporter.step("Check get the container property on the second node, expected allow"): with expect_not_raises(): frostfs_cli.container.get(self.cluster.storage_nodes[1].get_rpc_endpoint(), container) with reporter.step("Delete a rule"): frostfs_cli_on_first_node.control.remove_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerGet", ) with reporter.step("Check get the container property on the first node, expected allow"): with expect_not_raises(): frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), container) @allure.title("LocalOverride: Deny to PutContainer in root tenant") def test_local_override_deny_to_put_container_root( self, frostfs_cli_on_first_node: FrostfsCli, frostfs_cli: FrostfsCli, remove_local_ape_overrides: None, ): with reporter.step("Create a namespace rule for the first node"): frostfs_cli_on_first_node.control.add_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerPut", rule="deny Container.Put *", ) with reporter.step("Check create container on the first node, expected access denied error"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.PUT_CONTAINER)): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 1", ) with reporter.step("Check create a container on the second node, expected allow"): with expect_not_raises(): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), policy="REP 1", ) with reporter.step("Delete a rule"): frostfs_cli_on_first_node.control.remove_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerPut", ) with reporter.step("Check create a container on the first node, expected allow"): with expect_not_raises(): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 1", ) @allure.title("LocalOverride: Deny to ListContainer in root tenant") def test_local_override_deny_to_list_container_root( self, frostfs_cli_on_first_node: FrostfsCli, frostfs_cli: FrostfsCli, remove_local_ape_overrides: None, ): with reporter.step("Create a namespace rule for the first node"): frostfs_cli_on_first_node.control.add_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerList", rule="deny Container.List *", ) with reporter.step("Check list the container properties on the first node, expected access denied error"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.LIST_CONTAINER)): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) with reporter.step("Check list the container properties on the second node, expected allow"): with expect_not_raises(): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), ttl=1) with reporter.step("Delete a rule"): frostfs_cli_on_first_node.control.remove_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerList", ) with reporter.step("Check display a list of containers on the first node, expected allow"): with expect_not_raises(): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) @allure.title("LocalOverride: Deny to DeleteContainer in root tenant") @pytest.mark.parametrize("multiple_containers_request", [MultipleContainersRequest([REP2, REP2])], indirect=True) def test_local_override_deny_to_delete_container_root( self, frostfs_cli_on_first_node: FrostfsCli, frostfs_cli: FrostfsCli, remove_local_ape_overrides: None, containers: list[str], ): with reporter.step("Create a namespace rule for the first node"): frostfs_cli_on_first_node.control.add_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerDelete", rule="deny Container.Delete *", ) with reporter.step("Check delete first container from the first node, expected access denied error"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.DELETE_CONTAINER)): frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), containers[0], ttl=1) with reporter.step("Check delete a second container from the second node, expected allow"): with expect_not_raises(): frostfs_cli.container.delete(self.cluster.storage_nodes[1].get_rpc_endpoint(), containers[1], ttl=1) with reporter.step("Delete a rule"): frostfs_cli_on_first_node.control.remove_rule( endpoint=self.cluster.storage_nodes[0].get_control_endpoint(), target_type="namespace", target_name="root", chain_id="denyContainerDelete", ) with reporter.step("Check delete first container from the first node, expected allow"): with expect_not_raises(): frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), containers[0], ttl=1)