import logging import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.acl import ( bearer_token_base64_from_file, create_eacl, form_bearertoken_file, set_eacl, sign_bearer, wait_for_cache_expired, ) from frostfs_testlib.steps.cli.container import create_container from frostfs_testlib.steps.http.http_gate import upload_via_http_gate_curl, verify_object_hash from frostfs_testlib.storage.dataclasses.acl import EACLAccess, EACLOperation, EACLRole, EACLRule from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.file_utils import generate_file logger = logging.getLogger("NeoLogger") @pytest.mark.http_gate @pytest.mark.http_put class Test_http_bearer(ClusterTestBase): PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X" @pytest.fixture(scope="class", autouse=True) @allure.title("[Class/Autouse]: Prepare wallet and deposit") def prepare_wallet(self, default_wallet): Test_http_bearer.wallet = default_wallet @pytest.fixture(scope="class") def user_container(self) -> str: return create_container( wallet=self.wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=self.PLACEMENT_RULE, basic_acl=PUBLIC_ACL, ) @pytest.fixture(scope="class") def eacl_deny_for_others(self, user_container: str) -> None: with reporter.step(f"Set deny all operations for {EACLRole.OTHERS} via eACL"): eacl = EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=EACLOperation.PUT) set_eacl( self.wallet, user_container, create_eacl(user_container, eacl, shell=self.shell), shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) wait_for_cache_expired() @pytest.fixture(scope="class") def bearer_token_no_limit_for_others(self, user_container: str) -> str: with reporter.step(f"Create bearer token for {EACLRole.OTHERS} with all operations allowed"): bearer = form_bearertoken_file( self.wallet, user_container, [EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS) for op in EACLOperation], shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, sign=False, ) bearer_signed = f"{bearer}_signed" sign_bearer( shell=self.shell, wallet=self.wallet, eacl_rules_file_from=bearer, eacl_rules_file_to=bearer_signed, json=False, ) return bearer_token_base64_from_file(bearer_signed) @allure.title(f"[NEGATIVE] Put object without bearer token for {EACLRole.OTHERS}") def test_unable_put_without_bearer_token( self, simple_object_size: ObjectSize, user_container: str, eacl_deny_for_others ): eacl_deny_for_others upload_via_http_gate_curl( cid=user_container, filepath=generate_file(simple_object_size.value), endpoint=self.cluster.default_http_gate_endpoint, error_pattern="access to object operation denied", ) def test_put_with_bearer_when_eacl_restrict( self, object_size: ObjectSize, user_container: str, eacl_deny_for_others, bearer_token_no_limit_for_others: str, ): eacl_deny_for_others bearer = bearer_token_no_limit_for_others file_path = generate_file(object_size.value) with reporter.step(f"Put object with bearer token for {EACLRole.OTHERS}, then get and verify hashes"): headers = [f" -H 'Authorization: Bearer {bearer}'"] oid = upload_via_http_gate_curl( cid=user_container, filepath=file_path, endpoint=self.cluster.default_http_gate_endpoint, headers=headers, ) verify_object_hash( oid=oid, file_name=file_path, wallet=self.wallet, cid=user_container, shell=self.shell, nodes=self.cluster.storage_nodes, request_node=self.cluster.cluster_nodes[0], )