diff --git a/pytest_tests/testsuites/ape/test_ape.py b/pytest_tests/testsuites/ape/test_ape.py new file mode 100644 index 0000000..5d79d4c --- /dev/null +++ b/pytest_tests/testsuites/ape/test_ape.py @@ -0,0 +1,732 @@ +import time + +import allure +import pytest +from frostfs_testlib.cli import FrostfsAdm, FrostfsCli +from frostfs_testlib.credentials.interfaces import User +from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC +from frostfs_testlib.resources.common import MORPH_BLOCK_TIME +from frostfs_testlib.resources.error_patterns import ( + NO_RULE_FOUND_OBJECT, + OBJECT_ACCESS_DENIED, + RULE_ACCESS_DENIED_CONTAINER, + RULE_ACCESS_DENIED_OBJECT, +) +from frostfs_testlib.shell.interfaces import Shell +from frostfs_testlib.steps.cli.object import delete_object, get_object, get_range, get_range_hash, head_object, put_object, search_object +from frostfs_testlib.storage.cluster import Cluster, ClusterNode +from frostfs_testlib.storage.dataclasses.ape import Operations +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.testing.parallel import parallel +from frostfs_testlib.testing.test_control import expect_not_raises +from frostfs_testlib.utils import datetime_utils +from frostfs_testlib.utils.file_utils import generate_file + +reporter = get_reporter() + + +@pytest.fixture(scope="session") +def remote_frostfs_adm_first_node(cluster: Cluster): + node = cluster.cluster_nodes[0] + shell = node.host.get_shell() + adm = FrostfsAdm( + shell=shell, + frostfs_adm_exec_path=FROSTFS_ADM_EXEC, + config_file=FROSTFS_ADM_CONFIG_PATH, + ) + return adm + + +def morph_on_node(node: ClusterNode, id_chains_for_remove: list[str]): + shell: Shell = node.host.get_shell() + adm = FrostfsAdm( + shell=shell, + frostfs_adm_exec_path=FROSTFS_ADM_EXEC, + config_file=FROSTFS_ADM_CONFIG_PATH, + ) + rules = adm.morph.list_rules(target_name="root", target_type="namespace").stdout + chain_ids = [i.split(" ")[2].strip() for i in rules.split("\n") if "Chain ID" in i] + need_wait = False + with reporter.step(f"Delete rules on {node.storage_node.id} node"): + for chain_id in chain_ids: + if not chain_id in id_chains_for_remove: + continue + adm.morph.remove_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + ) + need_wait = True + if need_wait: + with reporter.step("Wait for one block"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + +@pytest.fixture(scope="session") +def remove_rule_ape_in_morph(cluster: Cluster) -> None: + id_chains_for_remove = [] + yield id_chains_for_remove + with reporter.step("Check morph rules on nodes."): + parallel(morph_on_node, cluster.cluster_nodes, id_chains_for_remove) + + +def pre_create_container_object_adm( + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + shell: Shell, + cluster: Cluster, +): + + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create put rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="allowPutObject", + rule=f"allow Object.Put *", + ) + + with reporter.step("Put objects in container on the first node"): + oid_1 = put_object(default_user.wallet, test_file, cid, shell, cluster.storage_nodes[0].get_rpc_endpoint()) + oid_2 = put_object(default_user.wallet, test_file, cid, shell, cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("Delete put rule for the first node"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="allowPutObject", + ) + + return cid, (oid_1, oid_2) + + +@pytest.mark.ape +@pytest.mark.ape_morph_rule +class TestApeMorphRuleChain(ClusterTestBase): + @allure.title("MorphRuleChain: Deny to GetContainer in root tenant") + def test_morph_rule_chain_deny_to_get_container_root( + self, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + remove_rule_ape_in_morph: None, + ): + chain_id = "denyContainerGet" + remove_rule_ape_in_morph.append(chain_id) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a namespace rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + rule="deny Container.Get *", + ) + + with reporter.step("[NEGATIVE] Check get the container property on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.GET_CONTAINER.value)): + frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid) + + with reporter.step("[NEGATIVE] Check et the container property on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.GET_CONTAINER.value)): + frostfs_cli.container.get(self.cluster.storage_nodes[1].get_rpc_endpoint(), cid) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + ) + + with reporter.step("Wait for one block"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + with reporter.step("Check get the container property on the first node"): + with expect_not_raises(): + frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid) + + @allure.title("MorphRuleChain: Deny to PutContainer in root tenant") + def test_morph_rule_chain_deny_to_put_container_root( + self, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + remove_rule_ape_in_morph: None, + ): + chain_id = "denyContainerPut" + remove_rule_ape_in_morph.append(chain_id) + + with reporter.step("Create a namespace rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + rule="deny Container.Put *", + ) + + with reporter.step("[NEGATIVE] Check create a container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.PUT_CONTAINER.value)): + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + + with reporter.step("[NEGATIVE] Check create a container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.PUT_CONTAINER.value)): + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), + policy="REP 4", + name="dcl2", + await_mode=True, + basic_acl="public-read-write", + ) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + ) + + with reporter.step("Wait for one block"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + with reporter.step("Check create a container on the first node"): + with expect_not_raises(): + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl3", + await_mode=True, + basic_acl="public-read-write", + ) + + @allure.title("Deny to ListContainer in root tenant") + def test_morph_rule_chain_deny_to_list_container_root( + self, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + remove_rule_ape_in_morph: None, + ): + + chain_id = "denyContainerList" + remove_rule_ape_in_morph.append(chain_id) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a namespace rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + rule="deny Container.List *", + ) + + with reporter.step("[NEGATIVE] Check list the container properties on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.LIST_CONTAINER.value)): + frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) + + with reporter.step("[NEGATIVE] Check list the container properties on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.LIST_CONTAINER.value)): + frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), ttl=1) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + ) + + with reporter.step("Wait for one block"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + with reporter.step("Check display a list of containers on the first node"): + with expect_not_raises(): + frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) + + @allure.title("MorphRuleChain: Deny to DeleteContainer in root tenant") + def test_morph_rule_chain_deny_to_delete_container_root( + self, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + remove_rule_ape_in_morph: None, + ): + chain_id = "denyContainerDelete" + remove_rule_ape_in_morph.append(chain_id) + + with reporter.step("Create a namespace rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + rule="deny Container.Delete *", + ) + + with reporter.step("Create container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("[NEGATIVE] Check delete first container from the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.DELETE_CONTAINER.value)): + frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid, ttl=1) + + with reporter.step("[NEGATIVE] Check delete second container from the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.DELETE_CONTAINER.value)): + frostfs_cli.container.delete(self.cluster.storage_nodes[1].get_rpc_endpoint(), cid, ttl=1) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="namespace", + target_name="root", + chain_id=chain_id, + ) + + with reporter.step("Wait for one block"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + with reporter.step("Check delete first container from the first node"): + with expect_not_raises(): + frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid, ttl=1) + + @allure.title("MorphRuleChain: Deny to GetObject in root tenant") + def test_morph_rule_chain_deny_to_get_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyGetObject", + rule=f"deny Object.Get /{cid}/*", + ) + + with reporter.step("Put object in container on the first node"): + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check get object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.GET_OBJECT.value)): + get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check get object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.GET_OBJECT.value)): + get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyGetObject", + ) + + with reporter.step("Check get object in container on the first node"): + with expect_not_raises(): + get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to PutObject in root tenant") + def test_morph_rule_chain_deny_to_put_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 2 CBF 1", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyPutObject", + rule=f"deny Object.Put /{cid}/*", + ) + + with reporter.step("[NEGATIVE] Check put object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.PUT_OBJECT.value)): + put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check put object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.PUT_OBJECT.value)): + put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyPutObject", + ) + + with reporter.step("Check put object in container on the first node"): + with expect_not_raises(): + put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to HeadObject in root tenant") + def test_morph_rule_chain_deny_to_head_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 4", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyHeadObject", + rule=f"deny Object.Head /{cid}/*", + ) + + with reporter.step("Put object in container on the first node"): + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check head object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HEAD_OBJECT.value)): + head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check head object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HEAD_OBJECT.value)): + head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyHeadObject", + ) + + with reporter.step("Check head object in container on the first node"): + with expect_not_raises(): + head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to SearchObject in root tenant") + def test_morph_rule_chain_deny_to_search_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 2", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denySearchObject", + rule=f"deny Object.Search /{cid}/*", + ) + + with reporter.step("[NEGATIVE] Check search object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.SEARCH_OBJECT.value)): + search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check search object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.SEARCH_OBJECT.value)): + search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denySearchObject", + ) + + with reporter.step("Check search object in container on the first node"): + with expect_not_raises(): + search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to RangeObject in root tenant") + def test_morph_rule_chain_deny_to_range_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 2", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyRangeObject", + rule=f"deny Object.Range /{cid}/*", + ) + + with reporter.step("Put object in container on the first node"): + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check range object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.RANGE_OBJECT.value)): + get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check range object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.RANGE_OBJECT.value)): + get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyRangeObject", + ) + + with reporter.step("Check range object in container on the first node,expected allow"): + with expect_not_raises(): + get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to HashObject in root tenant") + def test_morph_rule_chain_deny_to_hash_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 2", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyHashObject", + rule=f"deny Object.Hash /{cid}/*", + ) + + with reporter.step("Put object in container on the first node"): + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check range hash object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HASH_OBJECT.value)): + get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check range hash object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HASH_OBJECT.value)): + get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyHashObject", + ) + + with reporter.step("Check range hash object in container on the first node"): + with expect_not_raises(): + get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + @allure.title("MorphRuleChain: Deny to DeleteObject in root tenant") + def test_morph_rule_chain_deny_to_delete_object_root( + self, + default_user: User, + remote_frostfs_adm_first_node: FrostfsAdm, + frostfs_cli: FrostfsCli, + simple_object_size: ObjectSize, + ): + test_file = generate_file(simple_object_size.value) + + with reporter.step("Create a container on the first node"): + cid = ( + frostfs_cli.container.create( + rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), + policy="REP 2", + name="dcl1", + await_mode=True, + basic_acl="public-read-write", + ) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) + + with reporter.step("Create a container rule for the first node"): + remote_frostfs_adm_first_node.morph.add_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyDeleteObject", + rule=f"deny Object.Delete /{cid}/*", + ) + + with reporter.step("Put two objects in container on the first node"): + oid_1 = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + oid_2 = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("Search object in container on the first node"): + search_object_in_container_1 = search_object( + default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint() + ) + assert oid_1 in search_object_in_container_1, f"Object {oid_1} was not found" + assert oid_2 in search_object_in_container_1, f"Object {oid_2} was not found" + + with reporter.step("Search object from container on the second node"): + search_object_in_container_2 = search_object( + default_user.wallet, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint() + ) + assert oid_1 in search_object_in_container_2, f"Object {oid_1} was not found" + assert oid_2 in search_object_in_container_2, f"Object {oid_2} was not found" + + with reporter.step("[NEGATIVE] Check delete object from container on the first node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.DELETE_OBJECT.value)): + delete_object(default_user.wallet, cid, oid_1, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("[NEGATIVE] Check delete object from container on the second node"): + with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.DELETE_OBJECT.value)): + delete_object(default_user.wallet, cid, oid_2, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) + + with reporter.step("Delete a rule"): + remote_frostfs_adm_first_node.morph.remove_rule( + target_type="container", + target_name=f"{cid}", + chain_id="denyDeleteObject", + ) + + with reporter.step("Check delete object in container on the first node"): + with expect_not_raises(): + delete_object(default_user.wallet, cid, oid_1, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint())