import time import allure import pytest from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.credentials.interfaces import User from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.error_patterns import ( NO_RULE_FOUND_OBJECT, OBJECT_ACCESS_DENIED, RULE_ACCESS_DENIED_CONTAINER, RULE_ACCESS_DENIED_OBJECT, ) from frostfs_testlib.shell.interfaces import Shell from frostfs_testlib.steps.cli.object import delete_object, get_object, get_range, get_range_hash, head_object, put_object, search_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.ape import Operations from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.file_utils import generate_file reporter = get_reporter() @pytest.fixture(scope="session") def remote_frostfs_adm_first_node(cluster: Cluster): node = cluster.cluster_nodes[0] shell = node.host.get_shell() adm = FrostfsAdm( shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH, ) return adm def morph_on_node(node: ClusterNode, id_chains_for_remove: list[str]): shell: Shell = node.host.get_shell() adm = FrostfsAdm( shell=shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH, ) rules = adm.morph.list_rules(target_name="root", target_type="namespace").stdout chain_ids = [i.split(" ")[2].strip() for i in rules.split("\n") if "Chain ID" in i] need_wait = False with reporter.step(f"Delete rules on {node.storage_node.id} node"): for chain_id in chain_ids: if not chain_id in id_chains_for_remove: continue adm.morph.remove_rule( target_type="namespace", target_name="root", chain_id=chain_id, ) need_wait = True if need_wait: with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture(scope="session") def remove_rule_ape_in_morph(cluster: Cluster) -> None: id_chains_for_remove = [] yield id_chains_for_remove with reporter.step("Check morph rules on nodes."): parallel(morph_on_node, cluster.cluster_nodes, id_chains_for_remove) def pre_create_container_object_adm( default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, shell: Shell, cluster: Cluster, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create put rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowPutObject", rule=f"allow Object.Put *", ) with reporter.step("Put objects in container on the first node"): oid_1 = put_object(default_user.wallet, test_file, cid, shell, cluster.storage_nodes[0].get_rpc_endpoint()) oid_2 = put_object(default_user.wallet, test_file, cid, shell, cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Delete put rule for the first node"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="allowPutObject", ) return cid, (oid_1, oid_2) @pytest.mark.ape @pytest.mark.ape_morph_rule class TestApeMorphRuleChain(ClusterTestBase): @allure.title("MorphRuleChain: Deny to GetContainer in root tenant") def test_morph_rule_chain_deny_to_get_container_root( self, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, remove_rule_ape_in_morph: None, ): chain_id = "denyContainerGet" remove_rule_ape_in_morph.append(chain_id) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a namespace rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="namespace", target_name="root", chain_id=chain_id, rule="deny Container.Get *", ) with reporter.step("[NEGATIVE] Check get the container property on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.GET_CONTAINER.value)): frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid) with reporter.step("[NEGATIVE] Check et the container property on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.GET_CONTAINER.value)): frostfs_cli.container.get(self.cluster.storage_nodes[1].get_rpc_endpoint(), cid) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name="root", chain_id=chain_id, ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with reporter.step("Check get the container property on the first node"): with expect_not_raises(): frostfs_cli.container.get(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid) @allure.title("MorphRuleChain: Deny to PutContainer in root tenant") def test_morph_rule_chain_deny_to_put_container_root( self, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, remove_rule_ape_in_morph: None, ): chain_id = "denyContainerPut" remove_rule_ape_in_morph.append(chain_id) with reporter.step("Create a namespace rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="namespace", target_name="root", chain_id=chain_id, rule="deny Container.Put *", ) with reporter.step("[NEGATIVE] Check create a container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.PUT_CONTAINER.value)): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) with reporter.step("[NEGATIVE] Check create a container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.PUT_CONTAINER.value)): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name="root", chain_id=chain_id, ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with reporter.step("Check create a container on the first node"): with expect_not_raises(): frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", name="dcl3", await_mode=True, basic_acl="public-read-write", ) @allure.title("Deny to ListContainer in root tenant") def test_morph_rule_chain_deny_to_list_container_root( self, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, remove_rule_ape_in_morph: None, ): chain_id = "denyContainerList" remove_rule_ape_in_morph.append(chain_id) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a namespace rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="namespace", target_name="root", chain_id=chain_id, rule="deny Container.List *", ) with reporter.step("[NEGATIVE] Check list the container properties on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.LIST_CONTAINER.value)): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) with reporter.step("[NEGATIVE] Check list the container properties on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.LIST_CONTAINER.value)): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[1].get_rpc_endpoint(), ttl=1) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name="root", chain_id=chain_id, ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with reporter.step("Check display a list of containers on the first node"): with expect_not_raises(): frostfs_cli.container.list(rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), ttl=1) @allure.title("MorphRuleChain: Deny to DeleteContainer in root tenant") def test_morph_rule_chain_deny_to_delete_container_root( self, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, remove_rule_ape_in_morph: None, ): chain_id = "denyContainerDelete" remove_rule_ape_in_morph.append(chain_id) with reporter.step("Create a namespace rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="namespace", target_name="root", chain_id=chain_id, rule="deny Container.Delete *", ) with reporter.step("Create container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("[NEGATIVE] Check delete first container from the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.DELETE_CONTAINER.value)): frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid, ttl=1) with reporter.step("[NEGATIVE] Check delete second container from the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_CONTAINER.format(operation=Operations.DELETE_CONTAINER.value)): frostfs_cli.container.delete(self.cluster.storage_nodes[1].get_rpc_endpoint(), cid, ttl=1) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name="root", chain_id=chain_id, ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with reporter.step("Check delete first container from the first node"): with expect_not_raises(): frostfs_cli.container.delete(self.cluster.storage_nodes[0].get_rpc_endpoint(), cid, ttl=1) @allure.title("MorphRuleChain: Deny to GetObject in root tenant") def test_morph_rule_chain_deny_to_get_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyGetObject", rule=f"deny Object.Get /{cid}/*", ) with reporter.step("Put object in container on the first node"): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check get object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.GET_OBJECT.value)): get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check get object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.GET_OBJECT.value)): get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyGetObject", ) with reporter.step("Check get object in container on the first node"): with expect_not_raises(): get_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to PutObject in root tenant") def test_morph_rule_chain_deny_to_put_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 2 CBF 1", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyPutObject", rule=f"deny Object.Put /{cid}/*", ) with reporter.step("[NEGATIVE] Check put object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.PUT_OBJECT.value)): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check put object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.PUT_OBJECT.value)): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyPutObject", ) with reporter.step("Check put object in container on the first node"): with expect_not_raises(): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to HeadObject in root tenant") def test_morph_rule_chain_deny_to_head_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 4", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyHeadObject", rule=f"deny Object.Head /{cid}/*", ) with reporter.step("Put object in container on the first node"): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check head object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HEAD_OBJECT.value)): head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check head object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HEAD_OBJECT.value)): head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyHeadObject", ) with reporter.step("Check head object in container on the first node"): with expect_not_raises(): head_object(default_user.wallet, cid, oid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to SearchObject in root tenant") def test_morph_rule_chain_deny_to_search_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 2", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denySearchObject", rule=f"deny Object.Search /{cid}/*", ) with reporter.step("[NEGATIVE] Check search object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.SEARCH_OBJECT.value)): search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check search object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.SEARCH_OBJECT.value)): search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denySearchObject", ) with reporter.step("Check search object in container on the first node"): with expect_not_raises(): search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to RangeObject in root tenant") def test_morph_rule_chain_deny_to_range_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 2", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyRangeObject", rule=f"deny Object.Range /{cid}/*", ) with reporter.step("Put object in container on the first node"): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check range object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.RANGE_OBJECT.value)): get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check range object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.RANGE_OBJECT.value)): get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyRangeObject", ) with reporter.step("Check range object in container on the first node,expected allow"): with expect_not_raises(): get_range(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to HashObject in root tenant") def test_morph_rule_chain_deny_to_hash_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 2", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyHashObject", rule=f"deny Object.Hash /{cid}/*", ) with reporter.step("Put object in container on the first node"): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check range hash object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HASH_OBJECT.value)): get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check range hash object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.HASH_OBJECT.value)): get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyHashObject", ) with reporter.step("Check range hash object in container on the first node"): with expect_not_raises(): get_range_hash(default_user.wallet, cid, oid, "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Deny to DeleteObject in root tenant") def test_morph_rule_chain_deny_to_delete_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 2", await_mode=True, basic_acl="public-read-write", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="denyDeleteObject", rule=f"deny Object.Delete /{cid}/*", ) with reporter.step("Put two objects in container on the first node"): oid_1 = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) oid_2 = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Search object in container on the first node"): search_object_in_container_1 = search_object( default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint() ) assert oid_1 in search_object_in_container_1, f"Object {oid_1} was not found" assert oid_2 in search_object_in_container_1, f"Object {oid_2} was not found" with reporter.step("Search object from container on the second node"): search_object_in_container_2 = search_object( default_user.wallet, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint() ) assert oid_1 in search_object_in_container_2, f"Object {oid_1} was not found" assert oid_2 in search_object_in_container_2, f"Object {oid_2} was not found" with reporter.step("[NEGATIVE] Check delete object from container on the first node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.DELETE_OBJECT.value)): delete_object(default_user.wallet, cid, oid_1, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("[NEGATIVE] Check delete object from container on the second node"): with pytest.raises(RuntimeError, match=RULE_ACCESS_DENIED_OBJECT.format(operation=Operations.DELETE_OBJECT.value)): delete_object(default_user.wallet, cid, oid_2, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="denyDeleteObject", ) with reporter.step("Check delete object in container on the first node"): with expect_not_raises(): delete_object(default_user.wallet, cid, oid_1, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) @allure.title("MorphRuleChain: Allow to GetObject in root tenant") def test_morph_rule_chain_allow_to_get_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowGetObject", rule=f"allow Object.Get *", ) with reporter.step("Check get object from container on the first node"): with expect_not_raises(): get_object(default_user.wallet, cid, oids[0], self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check get object from container on the second node"): with expect_not_raises(): get_object(default_user.wallet, cid, oids[0], self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowGetObject", ) @allure.title("MorphRuleChain: allow to PutObject in root tenant") def test_morph_rule_chain_allow_to_put_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): test_file = generate_file(simple_object_size.value) with reporter.step("Create a container on the first node"): cid = ( frostfs_cli.container.create( rpc_endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), policy="REP 1 IN MOW CBF 1 SELECT 1 FROM MSK AS MOW FILTER SubDivCode EQ MOW AS MSK", name="dcl1", await_mode=True, basic_acl="0", ) .stdout.split(" ")[1] .strip() .split("\n")[0] ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowPutObject", rule=f"allow Object.Put *", ) with reporter.step("Check put object to container on the first node"): with expect_not_raises(): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check put object to container on the second node"): with expect_not_raises(): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="container", target_name=f"{cid}", chain_id="allowPutObject", ) @allure.title("MorphRuleChain: Allow to HeadObject in root tenant") def test_morph_rule_chain_allow_to_head_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowHeadObject", rule=f"allow Object.Head *", ) with reporter.step("Check head object from container on the first node"): with expect_not_raises(): head_object(default_user.wallet, cid, oids[0], self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check head object from container on the second node"): with expect_not_raises(): head_object(default_user.wallet, cid, oids[0], self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowHeadObject", ) @allure.title("MorphRuleChain: Allow to SearchObject in root tenant") def test_morph_rule_chain_allow_to_search_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowSearchObject", rule=f"allow Object.Search *", ) with reporter.step("Check search object from container on the first node"): with expect_not_raises(): search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check search object from container on the second node"): with expect_not_raises(): search_object(default_user.wallet, cid, self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowSearchObject", ) @allure.title("MorphRuleChain: Allow to RangehObject in root tenant") def test_morph_rule_chain_allow_to_range_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowRangeObject", rule=f"allow Object.Range *", ) with reporter.step("Check range object from container on the first node"): with expect_not_raises(): get_range(default_user.wallet, cid, oids[0], "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check range object from container on the second node"): with expect_not_raises(): get_range(default_user.wallet, cid, oids[0], "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowRangeObject", ) @allure.title("MorphRuleChain: Allow to Hash Object in root tenant") def test_morph_rule_chain_allow_to_hash_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowHashObject", rule=f"allow Object.Hash *", ) with reporter.step("Check range hash object from container on the first node"): with expect_not_raises(): get_range_hash(default_user.wallet, cid, oids[0], "0:10", self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check range hash object from container on the second node"): with expect_not_raises(): get_range_hash(default_user.wallet, cid, oids[0], "0:10", self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowHashObject", ) @allure.title("MorphRuleChain: Allow to Delete Object in root tenant") def test_morph_rule_chain_allow_to_delete_object_root( self, default_user: User, remote_frostfs_adm_first_node: FrostfsAdm, frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ): cid, oids = pre_create_container_object_adm( default_user, remote_frostfs_adm_first_node, frostfs_cli, simple_object_size, self.shell, self.cluster ) with reporter.step("Create a container rule for the first node"): remote_frostfs_adm_first_node.morph.add_rule( target_type="container", target_name=f"{cid}", chain_id="allowDeleteObject", rule=f"allow Object.Head Object.Delete *", ) with reporter.step("Check delete object from container on the first node"): with expect_not_raises(): delete_object(default_user.wallet, cid, oids[0], self.shell, self.cluster.storage_nodes[0].get_rpc_endpoint()) with reporter.step("Check delete object from container on the second node"): with expect_not_raises(): delete_object(default_user.wallet, cid, oids[1], self.shell, self.cluster.storage_nodes[1].get_rpc_endpoint()) with reporter.step("Delete a rule"): remote_frostfs_adm_first_node.morph.remove_rule( target_type="namespace", target_name=f"kapusta", chain_id="allowDeleteObject", )