import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.storage.dataclasses import ape from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.file_utils import TestFile from ....helpers.bearer_token import create_bearer_token from ....helpers.container_access import ( ALL_OBJECT_OPERATIONS, assert_access_to_container, assert_full_access_to_container, assert_no_access_to_container, ) @pytest.mark.nightly @pytest.mark.sanity @pytest.mark.bearer @pytest.mark.ape class TestApeBearer(ClusterTestBase): @allure.title("Operations with BearerToken (role={role}, obj_size={object_size})") @pytest.mark.parametrize("role", [ape.Role.OWNER, ape.Role.OTHERS], indirect=True) def test_bearer_token_operations( self, container: str, objects: list[str], frostfs_cli: FrostfsCli, temp_directory: str, test_wallet: WalletInfo, role: ape.Role, file_path: TestFile, rpc_endpoint: str, ): with reporter.step(f"Check {role} has full access to container without bearer token"): assert_full_access_to_container(test_wallet, container, objects.pop(), file_path, self.shell, self.cluster) with reporter.step(f"Deny all operations for everyone via APE"): rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS) frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string()) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step(f"Create bearer token with all operations allowed"): bearer = create_bearer_token( frostfs_cli, temp_directory, container, rule=ape.Rule(ape.Verb.ALLOW, ALL_OBJECT_OPERATIONS), endpoint=rpc_endpoint, ) with reporter.step(f"Check {role} without token has no access to all operations with container"): assert_no_access_to_container(test_wallet, container, objects.pop(), file_path, self.shell, self.cluster) with reporter.step(f"Check {role} with token has access to all operations with container"): assert_full_access_to_container(test_wallet, container, objects.pop(), file_path, self.shell, self.cluster, bearer) with reporter.step(f"Remove deny rule from APE"): frostfs_cli.ape_manager.remove(rpc_endpoint, rule.chain_id, target_name=container, target_type="container") with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step(f"Check {role} without token has access to all operations with container"): assert_full_access_to_container(test_wallet, container, objects.pop(), file_path, self.shell, self.cluster) @allure.title("BearerToken for compound operations (obj_size={object_size})") def test_bearer_token_compound_operations( self, frostfs_cli: FrostfsCli, temp_directory: str, default_wallet: WalletInfo, other_wallet: WalletInfo, container: str, objects: list[str], rpc_endpoint: str, file_path: TestFile, ): """ Bearer Token COMPLETLY overrides chains set for the specific target. Thus, any restictions or permissions should be explicitly defined in BT. """ wallets_map = { ape.Role.OWNER: default_wallet, ape.Role.OTHERS: other_wallet, } access_map = { ape.Role.OWNER: { ape.ObjectOperations.PUT: True, ape.ObjectOperations.GET: True, ape.ObjectOperations.HEAD: True, ape.ObjectOperations.GET_RANGE: True, ape.ObjectOperations.GET_RANGE_HASH: True, ape.ObjectOperations.SEARCH: True, ape.ObjectOperations.DELETE: False, }, ape.Role.OTHERS: { ape.ObjectOperations.PUT: True, ape.ObjectOperations.GET: True, ape.ObjectOperations.HEAD: True, ape.ObjectOperations.GET_RANGE: False, ape.ObjectOperations.GET_RANGE_HASH: False, ape.ObjectOperations.SEARCH: False, ape.ObjectOperations.DELETE: True, }, } bt_access_map = { ape.Role.OWNER: { ape.ObjectOperations.PUT: True, ape.ObjectOperations.GET: False, ape.ObjectOperations.HEAD: True, ape.ObjectOperations.GET_RANGE: True, ape.ObjectOperations.GET_RANGE_HASH: False, ape.ObjectOperations.SEARCH: False, ape.ObjectOperations.DELETE: True, }, # Bearer Token COMPLETLY overrides chains set for the specific target. # Thus, any restictions or permissions should be explicitly defined in BT. ape.Role.OTHERS: { ape.ObjectOperations.PUT: False, ape.ObjectOperations.GET: False, ape.ObjectOperations.HEAD: False, ape.ObjectOperations.GET_RANGE: False, ape.ObjectOperations.GET_RANGE_HASH: False, ape.ObjectOperations.SEARCH: False, ape.ObjectOperations.DELETE: False, }, } # Operations that we will deny for each role via APE deny_map = { ape.Role.OWNER: [ape.ObjectOperations.DELETE], ape.Role.OTHERS: [ ape.ObjectOperations.SEARCH, ape.ObjectOperations.GET_RANGE_HASH, ape.ObjectOperations.GET_RANGE, ], } # Operations that we will allow for each role with bearer token bearer_map = { ape.Role.OWNER: [ ape.ObjectOperations.PUT, ape.ObjectOperations.HEAD, ape.ObjectOperations.GET_RANGE, # Delete also requires PUT (to make tobstone) and HEAD (to get simple objects header) ape.ObjectOperations.DELETE, ], ape.Role.OTHERS: [ ape.ObjectOperations.GET, ape.ObjectOperations.GET_RANGE, ape.ObjectOperations.GET_RANGE_HASH, ], } conditions_map = { ape.Role.OWNER: ape.Condition.by_role(ape.Role.OWNER), ape.Role.OTHERS: ape.Condition.by_role(ape.Role.OTHERS), } verb_map = {ape.Role.OWNER: ape.Verb.ALLOW, ape.Role.OTHERS: ape.Verb.DENY} for role, operations in deny_map.items(): with reporter.step(f"Add APE deny rule for {role}"): rule = ape.Rule(ape.Verb.DENY, operations, conditions_map[role]) frostfs_cli.ape_manager.add( rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string() ) with reporter.step("Wait for one block"): self.wait_for_blocks() for role, wallet in wallets_map.items(): with reporter.step(f"Assert access to container without bearer token for {role}"): assert_access_to_container(access_map[role], wallet, container, objects.pop(), file_path, self.shell, self.cluster) bearer_tokens = {} for role in wallets_map.keys(): with reporter.step(f"Create bearer token for {role}"): rule = ape.Rule(verb_map[role], bearer_map[role], conditions_map[role]) bt = create_bearer_token(frostfs_cli, temp_directory, container, rule, rpc_endpoint) bearer_tokens[role] = bt for role, wallet in wallets_map.items(): with reporter.step(f"Assert access to container with bearer token for {role}"): assert_access_to_container( bt_access_map[role], wallet, container, objects.pop(), file_path, self.shell, self.cluster, bearer_tokens[role] )