From 926a7a5779c60c1b89262bfe0a05d40551f91f43 Mon Sep 17 00:00:00 2001 From: Vladimir Avdeev Date: Mon, 5 Sep 2022 15:12:56 +0300 Subject: [PATCH] Add eACL tests using bearer token Signed-off-by: Vladimir Avdeev --- pytest_tests/helpers/service_helper.py | 8 +- pytest_tests/testsuites/acl/conftest.py | 56 +- pytest_tests/testsuites/acl/test_acl.py | 100 ++-- pytest_tests/testsuites/acl/test_bearer.py | 227 ++++++-- pytest_tests/testsuites/acl/test_eacl.py | 173 ++++-- .../testsuites/acl/test_eacl_filters.py | 509 ++++++++++++++---- 6 files changed, 806 insertions(+), 267 deletions(-) diff --git a/pytest_tests/helpers/service_helper.py b/pytest_tests/helpers/service_helper.py index 4c3bacfa..22e486c7 100644 --- a/pytest_tests/helpers/service_helper.py +++ b/pytest_tests/helpers/service_helper.py @@ -6,6 +6,7 @@ import time from contextlib import contextmanager from datetime import datetime from typing import Optional +from requests import HTTPError import docker @@ -83,8 +84,11 @@ class LocalDevEnvStorageServiceHelper: client = self._get_docker_client(first_node_name) for container_name in self.ALL_CONTAINERS: - logs = client.logs(container_name, since=since, until=until) - + try: + logs = client.logs(container_name, since=since, until=until) + except HTTPError as exc: + logger.info(f"Got exception while dumping container '{container_name}' logs: {exc}") + continue # Dump logs to the directory file_path = os.path.join(directory_path, f"{container_name}-log.txt") with open(file_path, "wb") as file: diff --git a/pytest_tests/testsuites/acl/conftest.py b/pytest_tests/testsuites/acl/conftest.py index bdd79dfe..f449be0c 100644 --- a/pytest_tests/testsuites/acl/conftest.py +++ b/pytest_tests/testsuites/acl/conftest.py @@ -34,27 +34,26 @@ class Wallets: @pytest.fixture(scope="module") def wallets(prepare_wallet_and_deposit): - yield Wallets(wallets={ - EACLRole.USER: [ - Wallet( - wallet_path=prepare_wallet_and_deposit, - config_path=WALLET_CONFIG - )], - EACLRole.OTHERS: [ - Wallet( - wallet_path=init_wallet(ASSETS_DIR)[0], - config_path=WALLET_CONFIG - ), - Wallet( - wallet_path=init_wallet(ASSETS_DIR)[0], - config_path=WALLET_CONFIG - )], - EACLRole.SYSTEM: [ - Wallet( - wallet_path=IR_WALLET_PATH, - config_path=IR_WALLET_CONFIG - )], - }) + yield Wallets( + wallets={ + EACLRole.USER: [ + Wallet( + wallet_path=prepare_wallet_and_deposit, config_path=WALLET_CONFIG + ) + ], + EACLRole.OTHERS: [ + Wallet( + wallet_path=init_wallet(ASSETS_DIR)[0], config_path=WALLET_CONFIG + ), + Wallet( + wallet_path=init_wallet(ASSETS_DIR)[0], config_path=WALLET_CONFIG + ), + ], + EACLRole.SYSTEM: [ + Wallet(wallet_path=IR_WALLET_PATH, config_path=IR_WALLET_CONFIG) + ], + } + ) @pytest.fixture(scope="module") @@ -62,17 +61,22 @@ def file_path(): yield generate_file() -@pytest.fixture(scope='function') +@pytest.fixture(scope="function") def eacl_container_with_objects(wallets, file_path): user_wallet = wallets.get_wallet() - with allure.step('Create eACL public container'): + with allure.step("Create eACL public container"): cid = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL) - with allure.step('Add test objects to container'): + with allure.step("Add test objects to container"): objects_oids = [ put_object( - user_wallet.wallet_path, file_path, cid, - attributes={'key1': 'val1', 'key': val, 'key2': 'abc'}) for val in range(OBJECT_COUNT)] + user_wallet.wallet_path, + file_path, + cid, + attributes={"key1": "val1", "key": val, "key2": "abc"}, + ) + for val in range(OBJECT_COUNT) + ] yield cid, objects_oids, file_path diff --git a/pytest_tests/testsuites/acl/test_acl.py b/pytest_tests/testsuites/acl/test_acl.py index 8b5cade5..62fc1bd6 100644 --- a/pytest_tests/testsuites/acl/test_acl.py +++ b/pytest_tests/testsuites/acl/test_acl.py @@ -3,21 +3,23 @@ import pytest from python_keywords.acl import EACLRole from python_keywords.container import create_container -from python_keywords.container_access import (check_full_access_to_container, check_no_access_to_container, - check_read_only_container) +from python_keywords.container_access import ( + check_full_access_to_container, + check_no_access_to_container, + check_read_only_container, +) from python_keywords.neofs_verbs import put_object from wellknown_acl import PRIVATE_ACL_F, PUBLIC_ACL_F, READONLY_ACL_F @pytest.mark.sanity @pytest.mark.acl -@pytest.mark.acl_container +@pytest.mark.acl_basic class TestACLBasic: - - @pytest.fixture(scope='function') + @pytest.fixture(scope="function") def public_container(self, wallets): user_wallet = wallets.get_wallet() - with allure.step('Create public container'): + with allure.step("Create public container"): cid_public = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL_F) yield cid_public @@ -25,29 +27,33 @@ class TestACLBasic: # with allure.step('Delete public container'): # delete_container(user_wallet.wallet_path, cid_public) - @pytest.fixture(scope='function') + @pytest.fixture(scope="function") def private_container(self, wallets): user_wallet = wallets.get_wallet() - with allure.step('Create private container'): - cid_private = create_container(user_wallet.wallet_path, basic_acl=PRIVATE_ACL_F) + with allure.step("Create private container"): + cid_private = create_container( + user_wallet.wallet_path, basic_acl=PRIVATE_ACL_F + ) yield cid_private # with allure.step('Delete private container'): # delete_container(user_wallet.wallet_path, cid_private) - @pytest.fixture(scope='function') + @pytest.fixture(scope="function") def read_only_container(self, wallets): user_wallet = wallets.get_wallet() - with allure.step('Create public readonly container'): - cid_read_only = create_container(user_wallet.wallet_path, basic_acl=READONLY_ACL_F) + with allure.step("Create public readonly container"): + cid_read_only = create_container( + user_wallet.wallet_path, basic_acl=READONLY_ACL_F + ) yield cid_read_only # with allure.step('Delete public readonly container'): # delete_container(user_wallet.wallet_path, cid_read_only) - @allure.title('Test basic ACL on public container') + @allure.title("Test basic ACL on public container") def test_basic_acl_public(self, wallets, public_container, file_path): """ Test basic ACL set during public container creation. @@ -55,16 +61,31 @@ class TestACLBasic: user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(role=EACLRole.OTHERS) cid = public_container - for wallet, desc in ((user_wallet, 'owner'), (other_wallet, 'other users')): - with allure.step('Add test objects to container'): - # We create new objects for each wallet because check_full_access_to_container deletes the object - owner_object_oid = put_object(user_wallet.wallet_path, file_path, cid, attributes={'created': 'owner'}) - other_object_oid = put_object(other_wallet.wallet_path, file_path, cid, attributes={'created': 'other'}) - with allure.step(f'Check {desc} has full access to public container'): - check_full_access_to_container(wallet.wallet_path, cid, owner_object_oid, file_path) - check_full_access_to_container(wallet.wallet_path, cid, other_object_oid, file_path) + for wallet, desc in ((user_wallet, "owner"), (other_wallet, "other users")): + with allure.step("Add test objects to container"): + # We create new objects for each wallet because check_full_access_to_container + # deletes the object + owner_object_oid = put_object( + user_wallet.wallet_path, + file_path, + cid, + attributes={"created": "owner"}, + ) + other_object_oid = put_object( + other_wallet.wallet_path, + file_path, + cid, + attributes={"created": "other"}, + ) + with allure.step(f"Check {desc} has full access to public container"): + check_full_access_to_container( + wallet.wallet_path, cid, owner_object_oid, file_path + ) + check_full_access_to_container( + wallet.wallet_path, cid, other_object_oid, file_path + ) - @allure.title('Test basic ACL on private container') + @allure.title("Test basic ACL on private container") def test_basic_acl_private(self, wallets, private_container, file_path): """ Test basic ACL set during private container creation. @@ -72,18 +93,23 @@ class TestACLBasic: user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(role=EACLRole.OTHERS) cid = private_container - with allure.step('Add test objects to container'): + with allure.step("Add test objects to container"): owner_object_oid = put_object(user_wallet.wallet_path, file_path, cid) - with allure.step('Check only owner has full access to private container'): - with allure.step('Check no one except owner has access to operations with container'): - check_no_access_to_container(other_wallet.wallet_path, cid, owner_object_oid, file_path) + with allure.step("Check only owner has full access to private container"): + with allure.step( + "Check no one except owner has access to operations with container" + ): + check_no_access_to_container( + other_wallet.wallet_path, cid, owner_object_oid, file_path + ) - with allure.step('Check owner has full access to private container'): + with allure.step("Check owner has full access to private container"): check_full_access_to_container( - user_wallet.wallet_path, cid, owner_object_oid, file_path) + user_wallet.wallet_path, cid, owner_object_oid, file_path + ) - @allure.title('Test basic ACL on readonly container') + @allure.title("Test basic ACL on readonly container") def test_basic_acl_readonly(self, wallets, read_only_container, file_path): """ Test basic ACL Operations for Read-Only Container. @@ -92,11 +118,17 @@ class TestACLBasic: other_wallet = wallets.get_wallet(role=EACLRole.OTHERS) cid = read_only_container - with allure.step('Add test objects to container'): + with allure.step("Add test objects to container"): object_oid = put_object(user_wallet.wallet_path, file_path, cid) - with allure.step('Check other has read-only access to operations with container'): - check_read_only_container(other_wallet.wallet_path, cid, object_oid, file_path) + with allure.step( + "Check other has read-only access to operations with container" + ): + check_read_only_container( + other_wallet.wallet_path, cid, object_oid, file_path + ) - with allure.step('Check owner has full access to public container'): - check_full_access_to_container(user_wallet.wallet_path, cid, object_oid, file_path) + with allure.step("Check owner has full access to public container"): + check_full_access_to_container( + user_wallet.wallet_path, cid, object_oid, file_path + ) diff --git a/pytest_tests/testsuites/acl/test_bearer.py b/pytest_tests/testsuites/acl/test_bearer.py index 1e832880..93e5636a 100644 --- a/pytest_tests/testsuites/acl/test_bearer.py +++ b/pytest_tests/testsuites/acl/test_bearer.py @@ -1,59 +1,115 @@ import allure import pytest -from python_keywords.acl import (EACLAccess, EACLOperation, EACLRole, EACLRule, create_eacl, form_bearertoken_file, - set_eacl, wait_for_cache_expired) -from python_keywords.container_access import (check_custom_access_to_container, check_full_access_to_container, - check_no_access_to_container) +from python_keywords.acl import ( + EACLAccess, + EACLOperation, + EACLRole, + EACLRule, + create_eacl, + form_bearertoken_file, + set_eacl, + wait_for_cache_expired, +) +from python_keywords.container_access import ( + check_custom_access_to_container, + check_full_access_to_container, + check_no_access_to_container, +) @pytest.mark.sanity @pytest.mark.acl @pytest.mark.acl_bearer class TestACLBearer: - @pytest.mark.parametrize('role', [EACLRole.USER, EACLRole.OTHERS]) + @pytest.mark.parametrize("role", [EACLRole.USER, EACLRole.OTHERS]) def test_bearer_token_operations(self, wallets, eacl_container_with_objects, role): - allure.dynamic.title(f"Testcase to validate NeoFS operations with {role.value} BearerToken") + allure.dynamic.title( + f"Testcase to validate NeoFS operations with {role.value} BearerToken" + ) cid, objects_oids, file_path = eacl_container_with_objects user_wallet = wallets.get_wallet() deny_wallet = wallets.get_wallet(role) - with allure.step(f'Check {role.value} has full access to container without bearer token'): - check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path, - wallet_config=deny_wallet.config_path) + with allure.step( + f"Check {role.value} has full access to container without bearer token" + ): + check_full_access_to_container( + deny_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + wallet_config=deny_wallet.config_path, + ) - with allure.step(f'Set deny all operations for {role.value} via eACL'): - eacl = [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in EACLOperation] + with allure.step(f"Set deny all operations for {role.value} via eACL"): + eacl = [ + EACLRule(access=EACLAccess.DENY, role=role, operation=op) + for op in EACLOperation + ] eacl_file = create_eacl(cid, eacl) set_eacl(user_wallet.wallet_path, cid, eacl_file) wait_for_cache_expired() - with allure.step(f'Create bearer token for {role.value} with all operations allowed'): - bearer_token = form_bearertoken_file(user_wallet.wallet_path, cid, [ - EACLRule(operation=op, access=EACLAccess.ALLOW, role=role) - for op in EACLOperation]) + with allure.step( + f"Create bearer token for {role.value} with all operations allowed" + ): + bearer_token = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule(operation=op, access=EACLAccess.ALLOW, role=role) + for op in EACLOperation + ], + ) - with allure.step(f'Check {role.value} without token has no access to all operations with container'): + with allure.step( + f"Check {role.value} without token has no access to all operations with container" + ): check_no_access_to_container( - deny_wallet.wallet_path, cid, objects_oids.pop(), file_path, - wallet_config=deny_wallet.config_path) + deny_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + wallet_config=deny_wallet.config_path, + ) - with allure.step(f'Check {role.value} with token has access to all operations with container'): - check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path, - bearer=bearer_token, wallet_config=deny_wallet.config_path) + with allure.step( + f"Check {role.value} with token has access to all operations with container" + ): + check_full_access_to_container( + deny_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + bearer=bearer_token, + wallet_config=deny_wallet.config_path, + ) - with allure.step(f'Set allow all operations for {role.value} via eACL'): - eacl = [EACLRule(access=EACLAccess.ALLOW, role=role, operation=op) for op in EACLOperation] + with allure.step(f"Set allow all operations for {role.value} via eACL"): + eacl = [ + EACLRule(access=EACLAccess.ALLOW, role=role, operation=op) + for op in EACLOperation + ] eacl_file = create_eacl(cid, eacl) set_eacl(user_wallet.wallet_path, cid, eacl_file) wait_for_cache_expired() - with allure.step(f'Check {role.value} without token has access to all operations with container'): - check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path, - wallet_config=deny_wallet.config_path) + with allure.step( + f"Check {role.value} without token has access to all operations with container" + ): + check_full_access_to_container( + deny_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + wallet_config=deny_wallet.config_path, + ) - @allure.title('BearerToken Operations for compound Operations') - def test_bearer_token_compound_operations(self, wallets, eacl_container_with_objects): + @allure.title("BearerToken Operations for compound Operations") + def test_bearer_token_compound_operations( + self, wallets, eacl_container_with_objects + ): cid, objects_oids, file_path = eacl_container_with_objects user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(role=EACLRole.OTHERS) @@ -61,48 +117,105 @@ class TestACLBearer: # Operations that we will deny for each role via eACL deny_map = { EACLRole.USER: [EACLOperation.DELETE], - EACLRole.OTHERS: [EACLOperation.GET, EACLOperation.PUT, EACLOperation.GET_RANGE] + EACLRole.OTHERS: [ + EACLOperation.SEARCH, + EACLOperation.GET_RANGE_HASH, + EACLOperation.GET_RANGE, + ], } # Operations that we will allow for each role with bearer token bearer_map = { - EACLRole.USER: [EACLOperation.DELETE, EACLOperation.PUT, EACLOperation.GET_RANGE], - EACLRole.OTHERS: [EACLOperation.GET, EACLOperation.GET_RANGE], + EACLRole.USER: [ + EACLOperation.DELETE, + EACLOperation.PUT, + EACLOperation.GET_RANGE, + ], + EACLRole.OTHERS: [ + EACLOperation.GET, + EACLOperation.GET_RANGE, + EACLOperation.GET_RANGE_HASH, + ], } deny_map_with_bearer = { - EACLRole.USER: [op for op in deny_map[EACLRole.USER] if op not in bearer_map[EACLRole.USER]], - EACLRole.OTHERS: [op for op in deny_map[EACLRole.OTHERS] if op not in bearer_map[EACLRole.OTHERS]], + EACLRole.USER: [ + op + for op in deny_map[EACLRole.USER] + if op not in bearer_map[EACLRole.USER] + ], + EACLRole.OTHERS: [ + op + for op in deny_map[EACLRole.OTHERS] + if op not in bearer_map[EACLRole.OTHERS] + ], } eacl_deny = [] for role, operations in deny_map.items(): - eacl_deny += [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in operations] - set_eacl(user_wallet.wallet_path, cid, eacl_table_path=create_eacl(cid, eacl_deny)) + eacl_deny += [ + EACLRule(access=EACLAccess.DENY, role=role, operation=op) + for op in operations + ] + set_eacl( + user_wallet.wallet_path, cid, eacl_table_path=create_eacl(cid, eacl_deny) + ) wait_for_cache_expired() - with allure.step('Check rule consistency without bearer'): - check_custom_access_to_container(user_wallet.wallet_path, cid, objects_oids.pop(), file_path, - deny_operations=deny_map[EACLRole.USER], - wallet_config=user_wallet.config_path) - check_custom_access_to_container(other_wallet.wallet_path, cid, objects_oids.pop(), file_path, - deny_operations=deny_map[EACLRole.OTHERS], - wallet_config=other_wallet.config_path) + with allure.step("Check rule consistency without bearer"): + check_custom_access_to_container( + user_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + deny_operations=deny_map[EACLRole.USER], + wallet_config=user_wallet.config_path, + ) + check_custom_access_to_container( + other_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + deny_operations=deny_map[EACLRole.OTHERS], + wallet_config=other_wallet.config_path, + ) - with allure.step('Check rule consistency with bearer'): - bearer_token_user = form_bearertoken_file(user_wallet.wallet_path, cid, [ - EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) - for op in bearer_map[EACLRole.USER]]) + with allure.step("Check rule consistency using bearer token"): + bearer_token_user = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) + for op in bearer_map[EACLRole.USER] + ], + ) - bearer_token_other = form_bearertoken_file(user_wallet.wallet_path, cid, [ - EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS) - for op in bearer_map[EACLRole.OTHERS]]) + bearer_token_other = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule( + operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS + ) + for op in bearer_map[EACLRole.OTHERS] + ], + ) - check_custom_access_to_container(user_wallet.wallet_path, cid, objects_oids.pop(), file_path, - deny_operations=deny_map_with_bearer[EACLRole.USER], - bearer=bearer_token_user, - wallet_config=user_wallet.config_path) - check_custom_access_to_container(other_wallet.wallet_path, cid, objects_oids.pop(), file_path, - deny_operations=deny_map_with_bearer[EACLRole.OTHERS], - bearer=bearer_token_other, - wallet_config=other_wallet.config_path) + check_custom_access_to_container( + user_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + deny_operations=deny_map_with_bearer[EACLRole.USER], + bearer=bearer_token_user, + wallet_config=user_wallet.config_path, + ) + check_custom_access_to_container( + other_wallet.wallet_path, + cid, + objects_oids.pop(), + file_path, + deny_operations=deny_map_with_bearer[EACLRole.OTHERS], + bearer=bearer_token_other, + wallet_config=other_wallet.config_path, + ) diff --git a/pytest_tests/testsuites/acl/test_eacl.py b/pytest_tests/testsuites/acl/test_eacl.py index 88bf1671..b18de119 100644 --- a/pytest_tests/testsuites/acl/test_eacl.py +++ b/pytest_tests/testsuites/acl/test_eacl.py @@ -3,10 +3,20 @@ import pytest from common import NEOFS_NETMAP_DICT from failover_utils import wait_object_replication_on_nodes -from python_keywords.acl import (EACLAccess, EACLOperation, EACLRole, EACLRule, create_eacl, set_eacl, - wait_for_cache_expired) +from python_keywords.acl import ( + EACLAccess, + EACLOperation, + EACLRole, + EACLRule, + create_eacl, + set_eacl, + wait_for_cache_expired, +) from python_keywords.container import create_container -from python_keywords.container_access import check_full_access_to_container, check_no_access_to_container +from python_keywords.container_access import ( + check_full_access_to_container, + check_no_access_to_container, +) from python_keywords.neofs_verbs import put_object from python_keywords.node_management import drop_object from wellknown_acl import PUBLIC_ACL @@ -14,92 +24,155 @@ from wellknown_acl import PUBLIC_ACL @pytest.mark.sanity @pytest.mark.acl -@pytest.mark.acl_container +@pytest.mark.acl_extended class TestEACLContainer: NODE_COUNT = len(NEOFS_NETMAP_DICT.keys()) - @pytest.fixture(scope='function') + @pytest.fixture(scope="function") def eacl_full_placement_container_with_object(self, wallets, file_path): user_wallet = wallets.get_wallet() - with allure.step('Create eACL public container with full placement rule'): - full_placement_rule = f'REP {self.NODE_COUNT} IN X CBF 1 SELECT {self.NODE_COUNT} FROM * AS X' - cid = create_container(user_wallet.wallet_path, full_placement_rule, basic_acl=PUBLIC_ACL) + with allure.step("Create eACL public container with full placement rule"): + full_placement_rule = ( + f"REP {self.NODE_COUNT} IN X CBF 1 SELECT {self.NODE_COUNT} FROM * AS X" + ) + cid = create_container( + user_wallet.wallet_path, full_placement_rule, basic_acl=PUBLIC_ACL + ) - with allure.step('Add test object to container'): + with allure.step("Add test object to container"): oid = put_object(user_wallet.wallet_path, file_path, cid) - wait_object_replication_on_nodes(user_wallet.wallet_path, cid, oid, self.NODE_COUNT) + wait_object_replication_on_nodes( + user_wallet.wallet_path, cid, oid, self.NODE_COUNT + ) yield cid, oid, file_path - @pytest.mark.parametrize('deny_role', [EACLRole.USER, EACLRole.OTHERS]) - def test_extended_acl_deny_all_operations(self, wallets, eacl_container_with_objects, deny_role): + @pytest.mark.parametrize("deny_role", [EACLRole.USER, EACLRole.OTHERS]) + def test_extended_acl_deny_all_operations( + self, wallets, eacl_container_with_objects, deny_role + ): user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(EACLRole.OTHERS) deny_role_wallet = other_wallet if deny_role == EACLRole.OTHERS else user_wallet - not_deny_role_wallet = user_wallet if deny_role == EACLRole.OTHERS else other_wallet - deny_role_str = 'all others' if deny_role == EACLRole.OTHERS else 'user' - not_deny_role_str = 'user' if deny_role == EACLRole.OTHERS else 'all others' - allure.dynamic.title(f'Testcase to deny NeoFS operations for {deny_role_str}.') + not_deny_role_wallet = ( + user_wallet if deny_role == EACLRole.OTHERS else other_wallet + ) + deny_role_str = "all others" if deny_role == EACLRole.OTHERS else "user" + not_deny_role_str = "user" if deny_role == EACLRole.OTHERS else "all others" + allure.dynamic.title(f"Testcase to deny NeoFS operations for {deny_role_str}.") cid, object_oids, file_path = eacl_container_with_objects - with allure.step(f'Deny all operations for {deny_role_str} via eACL'): - eacl_deny = [EACLRule(access=EACLAccess.DENY, role=deny_role, operation=op) for op in EACLOperation] + with allure.step(f"Deny all operations for {deny_role_str} via eACL"): + eacl_deny = [ + EACLRule(access=EACLAccess.DENY, role=deny_role, operation=op) + for op in EACLOperation + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny)) wait_for_cache_expired() - with allure.step(f'Check only {not_deny_role_str} has full access to container'): - with allure.step(f'Check {deny_role_str} has not access to any operations with container'): - check_no_access_to_container(deny_role_wallet.wallet_path, cid, object_oids[0], file_path) + with allure.step( + f"Check only {not_deny_role_str} has full access to container" + ): + with allure.step( + f"Check {deny_role_str} has not access to any operations with container" + ): + check_no_access_to_container( + deny_role_wallet.wallet_path, cid, object_oids[0], file_path + ) - with allure.step(f'Check {not_deny_role_wallet} has full access to eACL public container'): - check_full_access_to_container(not_deny_role_wallet.wallet_path, cid, object_oids.pop(), file_path) + with allure.step( + f"Check {not_deny_role_wallet} has full access to eACL public container" + ): + check_full_access_to_container( + not_deny_role_wallet.wallet_path, cid, object_oids.pop(), file_path + ) - with allure.step(f'Allow all operations for {deny_role_str} via eACL'): - eacl_deny = [EACLRule(access=EACLAccess.ALLOW, role=deny_role, operation=op) for op in EACLOperation] + with allure.step(f"Allow all operations for {deny_role_str} via eACL"): + eacl_deny = [ + EACLRule(access=EACLAccess.ALLOW, role=deny_role, operation=op) + for op in EACLOperation + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny)) wait_for_cache_expired() - with allure.step(f'Check all have full access to eACL public container'): - check_full_access_to_container(user_wallet.wallet_path, cid, object_oids.pop(), file_path) - check_full_access_to_container(other_wallet.wallet_path, cid, object_oids.pop(), file_path) + with allure.step(f"Check all have full access to eACL public container"): + check_full_access_to_container( + user_wallet.wallet_path, cid, object_oids.pop(), file_path + ) + check_full_access_to_container( + other_wallet.wallet_path, cid, object_oids.pop(), file_path + ) - @allure.title('Testcase to allow NeoFS operations for only one other pubkey.') - def test_extended_acl_deny_all_operations_exclude_pubkey(self, wallets, eacl_container_with_objects): + @allure.title("Testcase to allow NeoFS operations for only one other pubkey.") + def test_extended_acl_deny_all_operations_exclude_pubkey( + self, wallets, eacl_container_with_objects + ): user_wallet = wallets.get_wallet() - other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[0:2] + other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[ + 0:2 + ] cid, object_oids, file_path = eacl_container_with_objects - with allure.step('Deny all operations for others except single wallet via eACL'): - eacl = [EACLRule(access=EACLAccess.ALLOW, role=other_wallet_allow.wallet_path, operation=op) - for op in EACLOperation] - eacl += [EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) for op in EACLOperation] + with allure.step( + "Deny all operations for others except single wallet via eACL" + ): + eacl = [ + EACLRule( + access=EACLAccess.ALLOW, + role=other_wallet_allow.wallet_path, + operation=op, + ) + for op in EACLOperation + ] + eacl += [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) + for op in EACLOperation + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl)) wait_for_cache_expired() - with allure.step('Check only owner and allowed other have full access to public container'): - with allure.step('Check other has not access to operations with container'): - check_no_access_to_container(other_wallet.wallet_path, cid, object_oids[0], file_path) + with allure.step( + "Check only owner and allowed other have full access to public container" + ): + with allure.step("Check other has not access to operations with container"): + check_no_access_to_container( + other_wallet.wallet_path, cid, object_oids[0], file_path + ) - with allure.step('Check owner has full access to public container'): - check_full_access_to_container(user_wallet.wallet_path, cid, object_oids.pop(), file_path) + with allure.step("Check owner has full access to public container"): + check_full_access_to_container( + user_wallet.wallet_path, cid, object_oids.pop(), file_path + ) - with allure.step('Check allowed other has full access to public container'): - check_full_access_to_container(other_wallet_allow.wallet_path, cid, object_oids.pop(), file_path) + with allure.step("Check allowed other has full access to public container"): + check_full_access_to_container( + other_wallet_allow.wallet_path, cid, object_oids.pop(), file_path + ) - @allure.title('Testcase to validate NeoFS replication with eACL deny rules.') - def test_extended_acl_deny_replication(self, wallets, eacl_full_placement_container_with_object, file_path): + @allure.title("Testcase to validate NeoFS replication with eACL deny rules.") + def test_extended_acl_deny_replication( + self, wallets, eacl_full_placement_container_with_object, file_path + ): user_wallet = wallets.get_wallet() cid, oid, file_path = eacl_full_placement_container_with_object - with allure.step('Deny all operations for user via eACL'): - eacl_deny = [EACLRule(access=EACLAccess.DENY, role=EACLRole.USER, operation=op) for op in EACLOperation] - eacl_deny += [EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) for op in EACLOperation] + with allure.step("Deny all operations for user via eACL"): + eacl_deny = [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.USER, operation=op) + for op in EACLOperation + ] + eacl_deny += [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) + for op in EACLOperation + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny)) wait_for_cache_expired() - with allure.step('Drop object to check replication'): + with allure.step("Drop object to check replication"): drop_object(node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid) storage_wallet_path = NEOFS_NETMAP_DICT[[*NEOFS_NETMAP_DICT][0]]["wallet_path"] - with allure.step('Wait for dropped object replicated'): - wait_object_replication_on_nodes(storage_wallet_path, cid, oid, self.NODE_COUNT) + with allure.step("Wait for dropped object replicated"): + wait_object_replication_on_nodes( + storage_wallet_path, cid, oid, self.NODE_COUNT + ) diff --git a/pytest_tests/testsuites/acl/test_eacl_filters.py b/pytest_tests/testsuites/acl/test_eacl_filters.py index a07945ab..8d85ef34 100644 --- a/pytest_tests/testsuites/acl/test_eacl_filters.py +++ b/pytest_tests/testsuites/acl/test_eacl_filters.py @@ -1,162 +1,412 @@ import allure import pytest -from python_keywords.acl import (EACLAccess, EACLFilter, EACLFilters, EACLHeaderType, EACLMatchType, EACLOperation, - EACLRole, EACLRule, create_eacl, set_eacl, wait_for_cache_expired) +from python_keywords.acl import ( + EACLAccess, + EACLFilter, + EACLFilters, + EACLHeaderType, + EACLMatchType, + EACLOperation, + EACLRole, + EACLRule, + create_eacl, + form_bearertoken_file, + set_eacl, + wait_for_cache_expired, +) from python_keywords.container import create_container, delete_container -from python_keywords.container_access import check_full_access_to_container, check_no_access_to_container +from python_keywords.container_access import ( + check_full_access_to_container, + check_no_access_to_container, +) from python_keywords.neofs_verbs import put_object -from python_keywords.object_access import can_get_object, can_get_head_object, can_put_object +from python_keywords.object_access import ( + can_get_head_object, + can_get_object, + can_put_object, +) from wellknown_acl import PUBLIC_ACL @pytest.mark.sanity @pytest.mark.acl -@pytest.mark.acl_container +@pytest.mark.acl_bearer +@pytest.mark.acl_filters class TestEACLFilters: # SPEC: https://github.com/nspcc-dev/neofs-spec/blob/master/01-arch/07-acl.md - ATTRIBUTE = {'check_key': 'check_value'} - OTHER_ATTRIBUTE = {'check_key': 'other_value'} - SET_HEADERS = {'key_one': 'check_value', 'x_key': 'xvalue', 'check_key': 'check_value'} - OTHER_HEADERS = {'key_one': 'check_value', 'x_key': 'other_value', 'check_key': 'other_value'} - REQ_EQUAL_FILTER = EACLFilter(key='check_key', value='check_value', header_type=EACLHeaderType.REQUEST) - NOT_REQ_EQUAL_FILTER = EACLFilter(key='check_key', value='other_value', match_type=EACLMatchType.STRING_NOT_EQUAL, - header_type=EACLHeaderType.REQUEST) - OBJ_EQUAL_FILTER = EACLFilter(key='check_key', value='check_value', header_type=EACLHeaderType.OBJECT) - NOT_OBJ_EQUAL_FILTER = EACLFilter(key='check_key', value='other_value', match_type=EACLMatchType.STRING_NOT_EQUAL, - header_type=EACLHeaderType.OBJECT) - OBJECT_COUNT = 3 - OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS = [EACLOperation.GET, EACLOperation.HEAD, EACLOperation.PUT] + ATTRIBUTE = {"check_key": "check_value"} + OTHER_ATTRIBUTE = {"check_key": "other_value"} + SET_HEADERS = { + "key_one": "check_value", + "x_key": "xvalue", + "check_key": "check_value", + } + OTHER_HEADERS = { + "key_one": "check_value", + "x_key": "other_value", + "check_key": "other_value", + } + REQ_EQUAL_FILTER = EACLFilter( + key="check_key", value="check_value", header_type=EACLHeaderType.REQUEST + ) + NOT_REQ_EQUAL_FILTER = EACLFilter( + key="check_key", + value="other_value", + match_type=EACLMatchType.STRING_NOT_EQUAL, + header_type=EACLHeaderType.REQUEST, + ) + OBJ_EQUAL_FILTER = EACLFilter( + key="check_key", value="check_value", header_type=EACLHeaderType.OBJECT + ) + NOT_OBJ_EQUAL_FILTER = EACLFilter( + key="check_key", + value="other_value", + match_type=EACLMatchType.STRING_NOT_EQUAL, + header_type=EACLHeaderType.OBJECT, + ) + OBJECT_COUNT = 5 + OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS = [ + EACLOperation.GET, + EACLOperation.HEAD, + EACLOperation.PUT, + ] - @pytest.fixture(scope='function') + @pytest.fixture(scope="function") def eacl_container_with_objects(self, wallets, file_path): user_wallet = wallets.get_wallet() - with allure.step('Create eACL public container'): + with allure.step("Create eACL public container"): cid = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL) - with allure.step('Add test objects to container'): + with allure.step("Add test objects to container"): objects_with_header = [ - put_object(user_wallet.wallet_path, file_path, cid, - attributes={**self.SET_HEADERS, 'key': val}) for val in range(self.OBJECT_COUNT)] + put_object( + user_wallet.wallet_path, + file_path, + cid, + attributes={**self.SET_HEADERS, "key": val}, + ) + for val in range(self.OBJECT_COUNT) + ] objects_with_other_header = [ - put_object(user_wallet.wallet_path, file_path, cid, - attributes={**self.OTHER_HEADERS, 'key': val}) for val in range(self.OBJECT_COUNT)] + put_object( + user_wallet.wallet_path, + file_path, + cid, + attributes={**self.OTHER_HEADERS, "key": val}, + ) + for val in range(self.OBJECT_COUNT) + ] objects_without_header = [ - put_object(user_wallet.wallet_path, file_path, cid) for _ in range(self.OBJECT_COUNT)] + put_object(user_wallet.wallet_path, file_path, cid) + for _ in range(self.OBJECT_COUNT) + ] yield cid, objects_with_header, objects_with_other_header, objects_without_header, file_path - with allure.step('Delete eACL public container'): + with allure.step("Delete eACL public container"): delete_container(user_wallet.wallet_path, cid) - @pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]) - def test_extended_acl_filters_request(self, wallets, eacl_container_with_objects, match_type): - allure.dynamic.title(f"Validate NeoFS operations with request filter: {match_type.name}") + @pytest.mark.parametrize( + "match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL] + ) + def test_extended_acl_filters_request( + self, wallets, eacl_container_with_objects, match_type + ): + allure.dynamic.title( + f"Validate NeoFS operations with request filter: {match_type.name}" + ) user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(EACLRole.OTHERS) - cid, objects_with_header, objects_with_other_header, objects_without_header, file_path =\ - eacl_container_with_objects + ( + cid, + objects_with_header, + objects_with_other_header, + objects_without_header, + file_path, + ) = eacl_container_with_objects - with allure.step('Deny all operations for other with eACL request filter'): + with allure.step("Deny all operations for other with eACL request filter"): equal_filter = EACLFilter(**self.REQ_EQUAL_FILTER.__dict__) equal_filter.match_type = match_type - eacl_deny = [EACLRule(access=EACLAccess.DENY, - role=EACLRole.OTHERS, - filters=EACLFilters([equal_filter]), - operation=op) for op in EACLOperation] + eacl_deny = [ + EACLRule( + access=EACLAccess.DENY, + role=EACLRole.OTHERS, + filters=EACLFilters([equal_filter]), + operation=op, + ) + for op in EACLOperation + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny)) wait_for_cache_expired() # Filter denies requests where "check_key {match_type} ATTRIBUTE", so when match_type # is STRING_EQUAL, then requests with "check_key=OTHER_ATTRIBUTE" will be allowed while # requests with "check_key=ATTRIBUTE" will be denied, and vice versa - allow_headers = self.OTHER_ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.ATTRIBUTE - deny_headers = self.ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.OTHER_ATTRIBUTE - # We test on 3 groups of objects with various headers, but eACL rule should ignore object headers and + allow_headers = ( + self.OTHER_ATTRIBUTE + if match_type == EACLMatchType.STRING_EQUAL + else self.ATTRIBUTE + ) + deny_headers = ( + self.ATTRIBUTE + if match_type == EACLMatchType.STRING_EQUAL + else self.OTHER_ATTRIBUTE + ) + # We test on 3 groups of objects with various headers, + # but eACL rule should ignore object headers and # work only based on request headers - for oid in (objects_with_header, objects_with_other_header, objects_without_header): - with allure.step('Check other has full access when sending request without headers'): - check_full_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path) + for oid in ( + objects_with_header, + objects_with_other_header, + objects_without_header, + ): + with allure.step( + "Check other has full access when sending request without headers" + ): + check_full_access_to_container( + other_wallet.wallet_path, cid, oid.pop(), file_path + ) - with allure.step('Check other has full access when sending request with allowed headers'): - check_full_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path, xhdr=allow_headers) + with allure.step( + "Check other has full access when sending request with allowed headers" + ): + check_full_access_to_container( + other_wallet.wallet_path, + cid, + oid.pop(), + file_path, + xhdr=allow_headers, + ) - with allure.step('Check other has no access when sending request with denied headers'): - check_no_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path, xhdr=deny_headers) + with allure.step( + "Check other has no access when sending request with denied headers" + ): + check_no_access_to_container( + other_wallet.wallet_path, + cid, + oid.pop(), + file_path, + xhdr=deny_headers, + ) - @pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]) - def test_extended_acl_deny_filters_object(self, wallets, eacl_container_with_objects, match_type): - allure.dynamic.title(f"Validate NeoFS operations with deny user headers filter: {match_type.name}") + with allure.step( + "Check other has full access when sending request " + "with denied headers and using bearer token" + ): + bearer_token_other = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule( + operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS + ) + for op in EACLOperation + ], + ) + check_full_access_to_container( + other_wallet.wallet_path, + cid, + oid.pop(), + file_path, + xhdr=deny_headers, + bearer=bearer_token_other, + ) + + @pytest.mark.parametrize( + "match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL] + ) + def test_extended_acl_deny_filters_object( + self, wallets, eacl_container_with_objects, match_type + ): + allure.dynamic.title( + f"Validate NeoFS operations with deny user headers filter: {match_type.name}" + ) user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(EACLRole.OTHERS) - cid, objects_with_header, objects_with_other_header, objs_without_header, file_path = \ - eacl_container_with_objects + ( + cid, + objects_with_header, + objects_with_other_header, + objs_without_header, + file_path, + ) = eacl_container_with_objects - with allure.step('Deny all operations for other with object filter'): + with allure.step("Deny all operations for other with object filter"): equal_filter = EACLFilter(**self.OBJ_EQUAL_FILTER.__dict__) equal_filter.match_type = match_type - eacl_deny = [EACLRule(access=EACLAccess.DENY, - role=EACLRole.OTHERS, - filters=EACLFilters([equal_filter]), - operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS] + eacl_deny = [ + EACLRule( + access=EACLAccess.DENY, + role=EACLRole.OTHERS, + filters=EACLFilters([equal_filter]), + operation=op, + ) + for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS + ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny)) wait_for_cache_expired() - allow_objects = objects_with_other_header if match_type == EACLMatchType.STRING_EQUAL else objects_with_header - deny_objects = objects_with_header if match_type == EACLMatchType.STRING_EQUAL else objects_with_other_header + allow_objects = ( + objects_with_other_header + if match_type == EACLMatchType.STRING_EQUAL + else objects_with_header + ) + deny_objects = ( + objects_with_header + if match_type == EACLMatchType.STRING_EQUAL + else objects_with_other_header + ) - # We will attempt requests with various headers, but eACL rule should ignore request headers and validate + # We will attempt requests with various headers, + # but eACL rule should ignore request headers and validate # only object headers for xhdr in (self.ATTRIBUTE, self.OTHER_ATTRIBUTE, None): - with allure.step(f'Check other have full access to objects without attributes'): + with allure.step( + "Check other have full access to objects without attributes" + ): check_full_access_to_container( - other_wallet.wallet_path, cid, objs_without_header.pop(), file_path, xhdr=xhdr) + other_wallet.wallet_path, + cid, + objs_without_header.pop(), + file_path, + xhdr=xhdr, + ) - with allure.step(f'Check other have full access to objects without deny attribute'): + with allure.step( + "Check other have full access to objects without deny attribute" + ): check_full_access_to_container( - other_wallet.wallet_path, cid, allow_objects.pop(), file_path, xhdr=xhdr) + other_wallet.wallet_path, + cid, + allow_objects.pop(), + file_path, + xhdr=xhdr, + ) - with allure.step(f'Check other have no access to objects with deny attribute'): - oid = deny_objects.pop() + with allure.step( + "Check other have no access to objects with deny attribute" + ): with pytest.raises(AssertionError): - assert can_get_head_object(other_wallet.wallet_path, cid, oid, xhdr=xhdr) + assert can_get_head_object( + other_wallet.wallet_path, cid, deny_objects[0], xhdr=xhdr + ) with pytest.raises(AssertionError): - assert can_get_object(other_wallet.wallet_path, cid, oid, file_path, xhdr=xhdr) + assert can_get_object( + other_wallet.wallet_path, + cid, + deny_objects[0], + file_path, + xhdr=xhdr, + ) - allow_attribute = self.OTHER_ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.ATTRIBUTE - with allure.step('Check other can PUT objects without denied attribute'): - assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=allow_attribute) + with allure.step( + "Check other have access to objects with deny attribute and using bearer token" + ): + bearer_token_other = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule( + operation=op, + access=EACLAccess.ALLOW, + role=EACLRole.OTHERS, + ) + for op in EACLOperation + ], + ) + check_full_access_to_container( + other_wallet.wallet_path, + cid, + deny_objects.pop(), + file_path, + xhdr=xhdr, + bearer=bearer_token_other, + ) + + allow_attribute = ( + self.OTHER_ATTRIBUTE + if match_type == EACLMatchType.STRING_EQUAL + else self.ATTRIBUTE + ) + with allure.step("Check other can PUT objects without denied attribute"): + assert can_put_object( + other_wallet.wallet_path, cid, file_path, attributes=allow_attribute + ) assert can_put_object(other_wallet.wallet_path, cid, file_path) - deny_attribute = self.ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.OTHER_ATTRIBUTE - with allure.step('Check other can not PUT objects with denied attribute'): + deny_attribute = ( + self.ATTRIBUTE + if match_type == EACLMatchType.STRING_EQUAL + else self.OTHER_ATTRIBUTE + ) + with allure.step("Check other can not PUT objects with denied attribute"): with pytest.raises(AssertionError): - assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=deny_attribute) + assert can_put_object( + other_wallet.wallet_path, cid, file_path, attributes=deny_attribute + ) - @pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]) - def test_extended_acl_allow_filters_object(self, wallets, eacl_container_with_objects, match_type): + with allure.step( + "Check other can PUT objects with denied attribute and using bearer token" + ): + bearer_token_other_for_put = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule( + operation=EACLOperation.PUT, + access=EACLAccess.ALLOW, + role=EACLRole.OTHERS, + ) + ], + ) + assert can_put_object( + other_wallet.wallet_path, + cid, + file_path, + attributes=deny_attribute, + bearer=bearer_token_other_for_put, + ) + + @pytest.mark.parametrize( + "match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL] + ) + def test_extended_acl_allow_filters_object( + self, wallets, eacl_container_with_objects, match_type + ): allure.dynamic.title( - f"Testcase to validate NeoFS operation with allow eACL user headers filters: {match_type.name}") + "Testcase to validate NeoFS operation with allow eACL user headers filters:" + f"{match_type.name}" + ) user_wallet = wallets.get_wallet() other_wallet = wallets.get_wallet(EACLRole.OTHERS) - cid, objects_with_header, objects_with_other_header, objects_without_header, file_path = \ - eacl_container_with_objects + ( + cid, + objects_with_header, + objects_with_other_header, + objects_without_header, + file_path, + ) = eacl_container_with_objects - with allure.step('Deny all operations for others except few operations allowed by object filter'): + with allure.step( + "Deny all operations for others except few operations allowed by object filter" + ): equal_filter = EACLFilter(**self.OBJ_EQUAL_FILTER.__dict__) equal_filter.match_type = match_type eacl = [ - EACLRule(access=EACLAccess.ALLOW, - role=EACLRole.OTHERS, - filters=EACLFilters([equal_filter]), - operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS - ] + [ - EACLRule(access=EACLAccess.DENY, - role=EACLRole.OTHERS, - operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS - + EACLRule( + access=EACLAccess.ALLOW, + role=EACLRole.OTHERS, + filters=EACLFilters([equal_filter]), + operation=op, + ) + for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS + ] + [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) + for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS ] set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl)) wait_for_cache_expired() @@ -172,7 +422,7 @@ class TestEACLFilters: allow_attribute = self.OTHER_ATTRIBUTE deny_attribute = self.ATTRIBUTE - with allure.step(f'Check other cannot get and put objects without attributes'): + with allure.step(f"Check other cannot get and put objects without attributes"): oid = objects_without_header.pop() with pytest.raises(AssertionError): assert can_get_head_object(other_wallet.wallet_path, cid, oid) @@ -181,17 +431,80 @@ class TestEACLFilters: with pytest.raises(AssertionError): assert can_put_object(other_wallet.wallet_path, cid, file_path) - with allure.step(f'Check other can get objects with attributes matching the filter'): + with allure.step( + "Check other can get and put objects without attributes " + "and using bearer token" + ): + bearer_token_other = form_bearertoken_file( + user_wallet.wallet_path, + cid, + [ + EACLRule( + operation=op, + access=EACLAccess.ALLOW, + role=EACLRole.OTHERS, + ) + for op in EACLOperation + ], + ) + assert can_get_head_object( + other_wallet.wallet_path, + cid, + objects_without_header[0], + bearer=bearer_token_other, + ) + assert can_get_object( + other_wallet.wallet_path, + cid, + objects_without_header[0], + file_path, + bearer=bearer_token_other, + ) + assert can_put_object( + other_wallet.wallet_path, cid, file_path, bearer=bearer_token_other + ) + + with allure.step( + f"Check other can get objects with attributes matching the filter" + ): oid = allow_objects.pop() assert can_get_head_object(other_wallet.wallet_path, cid, oid) assert can_get_object(other_wallet.wallet_path, cid, oid, file_path) - assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=allow_attribute) + assert can_put_object( + other_wallet.wallet_path, cid, file_path, attributes=allow_attribute + ) - with allure.step(f'Check other cannot get objects without attributes matching the filter'): + with allure.step( + "Check other cannot get objects without attributes matching the filter" + ): + with pytest.raises(AssertionError): + assert can_get_head_object( + other_wallet.wallet_path, cid, deny_objects[0] + ) + with pytest.raises(AssertionError): + assert can_get_object( + other_wallet.wallet_path, cid, deny_objects[0], file_path + ) + with pytest.raises(AssertionError): + assert can_put_object( + other_wallet.wallet_path, cid, file_path, attributes=deny_attribute + ) + + with allure.step( + "Check other can get objects without attributes matching the filter " + "and using bearer token" + ): oid = deny_objects.pop() - with pytest.raises(AssertionError): - assert can_get_head_object(other_wallet.wallet_path, cid, oid) - with pytest.raises(AssertionError): - assert can_get_object(other_wallet.wallet_path, cid, oid, file_path) - with pytest.raises(AssertionError): - assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=deny_attribute) + assert can_get_head_object( + other_wallet.wallet_path, cid, oid, bearer=bearer_token_other + ) + assert can_get_object( + other_wallet.wallet_path, cid, oid, file_path, bearer=bearer_token_other + ) + assert can_put_object( + other_wallet.wallet_path, + cid, + file_path, + attributes=deny_attribute, + bearer=bearer_token_other, + )