import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED from frostfs_testlib.steps.cli.object import put_object_to_random_node from frostfs_testlib.steps.node_management import drop_object from frostfs_testlib.storage.dataclasses import ape from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.grpc_operations.interfaces_wrapper import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import wallet_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import TestFile from ....helpers.container_access import ( ALL_OBJECT_OPERATIONS, assert_access_to_container, assert_full_access_to_container, assert_no_access_to_container, ) from ....helpers.container_request import APE_EVERYONE_ALLOW_ALL, OWNER_ALLOW_ALL, ContainerRequest @pytest.fixture def denied_wallet(default_wallet: WalletInfo, other_wallet: WalletInfo, role: ape.Role) -> WalletInfo: return other_wallet if role == ape.Role.OTHERS else default_wallet @pytest.fixture def allowed_wallet(default_wallet: WalletInfo, other_wallet: WalletInfo, role: ape.Role) -> WalletInfo: return default_wallet if role == ape.Role.OTHERS else other_wallet @pytest.mark.nightly @pytest.mark.ape class TestApeContainer(ClusterTestBase): # TODO: Without PATCH operation, # since it requires specific permissions that do not apply when testing all operations at once @pytest.mark.sanity @allure.title("Deny operations via APE by role (role={role}, obj_size={object_size})") @pytest.mark.parametrize("role", [ape.Role.OWNER, ape.Role.OTHERS], indirect=True) @pytest.mark.parametrize("objects", [4], indirect=True) def test_deny_operations_via_ape_by_role( self, denied_wallet: WalletInfo, allowed_wallet: WalletInfo, frostfs_cli: FrostfsCli, container: str, objects: list[str], role: ape.Role, test_file: TestFile, rpc_endpoint: str, ): with reporter.step(f"Deny all operations for {role} via APE"): deny_rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, ape.Condition.by_role(role.value)) frostfs_cli.ape_manager.add( rpc_endpoint, deny_rule.chain_id, target_name=container, target_type="container", rule=deny_rule.as_string() ) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step(f"Assert denied role have no access to public container"): # access checks will try to remove object, so we use .pop() to ensure we have object before deletion assert_no_access_to_container(denied_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step(f"Assert allowed role have full access to public container"): assert_full_access_to_container(allowed_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step(f"Remove deny rule from APE"): frostfs_cli.ape_manager.remove(rpc_endpoint, deny_rule.chain_id, target_name=container, target_type="container") with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert allowed role have full access to public container"): assert_full_access_to_container(allowed_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step("Assert denied role have full access to public container"): assert_full_access_to_container(denied_wallet, container, objects.pop(), test_file, self.shell, self.cluster) # TODO: Without PATCH operation, # since it requires specific permissions that do not apply when testing all operations at once @allure.title("Deny operations for others via APE excluding single pubkey (obj_size={object_size})") @pytest.mark.parametrize("objects", [2], indirect=True) def test_deny_opeartions_excluding_pubkey( self, frostfs_cli: FrostfsCli, default_wallet: WalletInfo, other_wallet: WalletInfo, other_wallet_2: WalletInfo, container: str, objects: list[str], rpc_endpoint: str, test_file: TestFile, ): with reporter.step("Add deny APE rules for others except single wallet"): rule_conditions = [ ape.Condition.by_role(ape.Role.OTHERS), ape.Condition.by_key( wallet_utils.get_wallet_public_key(other_wallet_2.path, other_wallet_2.password), match_type=ape.MatchType.NOT_EQUAL, ), ] rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, rule_conditions) frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string()) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert others have no access to public container"): # access checks will try to remove object, so we use .pop() to ensure we have object before deletion assert_no_access_to_container(other_wallet, container, objects[0], test_file, self.shell, self.cluster) with reporter.step("Assert owner have full access to public container"): assert_full_access_to_container(default_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step("Assert allowed wallet have full access to public container"): assert_full_access_to_container(other_wallet_2, container, objects.pop(), test_file, self.shell, self.cluster) @allure.title("Replication works with APE deny rules on OWNER and OTHERS (obj_size={object_size})") @pytest.mark.parametrize( "container_request", [ContainerRequest(f"REP %NODE_COUNT% IN X CBF 1 SELECT %NODE_COUNT% FROM * AS X", APE_EVERYONE_ALLOW_ALL, "custom")], indirect=True, ) def test_replication_works_with_deny_rules( self, default_wallet: WalletInfo, frostfs_cli: FrostfsCli, container: str, rpc_endpoint: str, test_file: TestFile, ): with reporter.step("Put object to container"): oid = put_object_to_random_node(default_wallet, test_file, container, self.shell, self.cluster) with reporter.step("Wait for object replication after upload"): wait_object_replication(container, oid, len(self.cluster.cluster_nodes), self.shell, self.cluster.storage_nodes) with reporter.step("Add deny APE rules for owner and others"): rule_conditions = [ ape.Condition.by_role(ape.Role.OWNER), ape.Condition.by_role(ape.Role.OTHERS), ] for rule_condition in rule_conditions: rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, rule_condition) frostfs_cli.ape_manager.add( rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string() ) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Drop object"): drop_object(self.cluster.storage_nodes[0], container, oid) with reporter.step("Wait for dropped object to be replicated"): wait_object_replication(container, oid, len(self.cluster.storage_nodes), self.shell, self.cluster.storage_nodes) # TODO: Without PATCH operation, # since it requires specific permissions that do not apply when testing all operations at once @allure.title("Deny operations via APE by role (role=ir, obj_size={object_size})") @pytest.mark.parametrize("container_request", [OWNER_ALLOW_ALL], indirect=True) @pytest.mark.parametrize("objects", [3], indirect=True) def test_deny_operations_via_ape_by_role_ir( self, frostfs_cli: FrostfsCli, ir_wallet: WalletInfo, container: str, objects: list[str], rpc_endpoint: str, test_file: TestFile ): default_ir_access = { ape.ObjectOperations.PUT: False, ape.ObjectOperations.GET: True, ape.ObjectOperations.HEAD: True, ape.ObjectOperations.GET_RANGE: True, ape.ObjectOperations.GET_RANGE_HASH: True, ape.ObjectOperations.SEARCH: True, ape.ObjectOperations.PATCH: False, ape.ObjectOperations.DELETE: False, } with reporter.step("Assert IR wallet access in default state"): assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step("Add deny APE rule with deny all operations for IR role"): rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, [ape.Condition.by_role(ape.Role.IR.value)]) frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string()) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert IR wallet ignores APE rules"): assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step("Remove APE rule"): frostfs_cli.ape_manager.remove(rpc_endpoint, rule.chain_id, target_name=container, target_type="container") with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert IR wallet access is restored"): assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster) # TODO: Without PATCH operation, # since it requires specific permissions that do not apply when testing all operations at once @allure.title("Deny operations via APE by role (role=container, obj_size={object_size})") @pytest.mark.parametrize("container_request", [OWNER_ALLOW_ALL], indirect=True) @pytest.mark.parametrize("objects", [3], indirect=True) def test_deny_operations_via_ape_by_role_container( self, frostfs_cli: FrostfsCli, container_node_wallet: WalletInfo, container: str, objects: list[str], rpc_endpoint: str, test_file: TestFile, ): access_matrix = { ape.ObjectOperations.PUT: True, ape.ObjectOperations.GET: True, ape.ObjectOperations.HEAD: True, ape.ObjectOperations.GET_RANGE: True, ape.ObjectOperations.GET_RANGE_HASH: True, ape.ObjectOperations.SEARCH: True, ape.ObjectOperations.PATCH: True, ape.ObjectOperations.DELETE: True, } with reporter.step("Assert CONTAINER wallet access in default state"): assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster) rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, ape.Condition.by_role(ape.Role.CONTAINER.value)) with reporter.step(f"Add APE rule with deny all operations for CONTAINER and IR roles"): frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string()) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert CONTAINER wallet ignores APE rule"): assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster) with reporter.step("Remove APE rule"): frostfs_cli.ape_manager.remove(rpc_endpoint, rule.chain_id, target_name=container, target_type="container") with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Assert CONTAINER wallet access after rule was removed"): assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster) # ^ @allure.title("Deny PATCH operation via APE (object_size={object_size})") @pytest.mark.parametrize("objects", [1], indirect=True) def test_patch_object_with_deny_rule( self, frostfs_cli: FrostfsCli, grpc_client: GrpcClientWrapper, grpc_client_with_other_wallet: GrpcClientWrapper, grpc_client_with_container_wallet: GrpcClientWrapper, grpc_client_with_ir_wallet: GrpcClientWrapper, container: str, objects: list[str], test_file: TestFile, ): patch_params = { "cid": container, "oid": objects[0], "endpoint": self.cluster.default_rpc_endpoint, "ranges": ["300:200"], "payloads": [test_file], "new_attrs": "owner=true", "timeout": "200s", } with reporter.step("Check that PATCH is available with owner wallet"): patched_oid = grpc_client.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with another wallet"): patch_params["ranges"] = ["100:50"] patch_params["new_attrs"] = "other=true" patched_oid = grpc_client_with_other_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with container wallet"): patch_params["ranges"] = ["600:0"] patch_params["new_attrs"] = "container=true" patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with ir wallet"): patch_params["ranges"] = ["0:1000"] patch_params["new_attrs"] = "ir=true" patched_oid = grpc_client_with_ir_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid rule = ape.Rule(ape.Verb.DENY, ape.ObjectOperations.PATCH) with reporter.step("Add APE rule with deny PATCH operation"): frostfs_cli.ape_manager.add( self.cluster.default_rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string(), ) with reporter.step("Wait for one block"): self.wait_for_blocks(1) with reporter.step("Check that PATCH is not allowed with owner wallet"): patch_params["ranges"] = ["300:200"] patch_params["new_attrs"] = "owner_2=false" with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED): grpc_client.object.patch(**patch_params) with reporter.step("Check that PATCH is not allowed with another wallet"): patch_params["ranges"] = ["100:50"] patch_params["new_attrs"] = "other_2=false" with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED): grpc_client_with_other_wallet.object.patch(**patch_params) with reporter.step("Check that PATCH is allowed with container wallet as rule is ignored"): patch_params["ranges"] = ["600:0"] patch_params["new_attrs"] = "container_2=true" patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is not allowed with ir waller"): patch_params["ranges"] = ["0:1000"] patch_params["new_attrs"] = "ir_2=true" with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED): grpc_client_with_ir_wallet.object.patch(**patch_params) with reporter.step("Remove APE rule"): frostfs_cli.ape_manager.remove( self.cluster.default_rpc_endpoint, rule.chain_id, target_name=container, target_type="container", ) with reporter.step("Wait for one block"): self.wait_for_blocks(1) with reporter.step("Check that PATCH is available with owner wallet"): patch_params["ranges"] = ["300:200"] patch_params["new_attrs"] = "owner_3=true" patched_oid = grpc_client.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with another wallet"): patch_params["ranges"] = ["100:50"] patch_params["new_attrs"] = "other_3=true" patched_oid = grpc_client_with_other_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with container wallet"): patch_params["ranges"] = ["600:0"] patch_params["new_attrs"] = "container_3=true" patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid with reporter.step("Check that PATCH is available with ir wallet"): patch_params["ranges"] = ["0:1000"] patch_params["new_attrs"] = "ir_3=true" patched_oid = grpc_client_with_ir_wallet.object.patch(**patch_params) assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one" patch_params["oid"] = patched_oid attrs = {"owner", "other", "container", "ir", "container_2", "owner_3", "other_3", "container_3", "ir_3"} with reporter.step("Ensure that all attributes match expected values"): object_info: dict = grpc_client.object.head(container, patch_params["oid"], self.cluster.default_rpc_endpoint) object_attrs: dict = object_info["header"]["attributes"] assert attrs <= {k for k in object_attrs.keys()}, f"Received attributes do not match expected ones: {object_attrs}" assert all( v == "true" for k, v in object_attrs.items() if k in attrs ), f"Received attributes do not match expected ones: {object_attrs}"