Add eACL tests using bearer token

Signed-off-by: Vladimir Avdeev <v.avdeev@yadro.com>
This commit is contained in:
Vladimir Avdeev 2022-09-05 15:12:56 +03:00 committed by Vladimir Domnich
parent 92cbc2e11b
commit 926a7a5779
6 changed files with 806 additions and 267 deletions

View file

@ -6,6 +6,7 @@ import time
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
from requests import HTTPError
import docker
@ -83,8 +84,11 @@ class LocalDevEnvStorageServiceHelper:
client = self._get_docker_client(first_node_name)
for container_name in self.ALL_CONTAINERS:
logs = client.logs(container_name, since=since, until=until)
try:
logs = client.logs(container_name, since=since, until=until)
except HTTPError as exc:
logger.info(f"Got exception while dumping container '{container_name}' logs: {exc}")
continue
# Dump logs to the directory
file_path = os.path.join(directory_path, f"{container_name}-log.txt")
with open(file_path, "wb") as file:

View file

@ -34,27 +34,26 @@ class Wallets:
@pytest.fixture(scope="module")
def wallets(prepare_wallet_and_deposit):
yield Wallets(wallets={
EACLRole.USER: [
Wallet(
wallet_path=prepare_wallet_and_deposit,
config_path=WALLET_CONFIG
)],
EACLRole.OTHERS: [
Wallet(
wallet_path=init_wallet(ASSETS_DIR)[0],
config_path=WALLET_CONFIG
),
Wallet(
wallet_path=init_wallet(ASSETS_DIR)[0],
config_path=WALLET_CONFIG
)],
EACLRole.SYSTEM: [
Wallet(
wallet_path=IR_WALLET_PATH,
config_path=IR_WALLET_CONFIG
)],
})
yield Wallets(
wallets={
EACLRole.USER: [
Wallet(
wallet_path=prepare_wallet_and_deposit, config_path=WALLET_CONFIG
)
],
EACLRole.OTHERS: [
Wallet(
wallet_path=init_wallet(ASSETS_DIR)[0], config_path=WALLET_CONFIG
),
Wallet(
wallet_path=init_wallet(ASSETS_DIR)[0], config_path=WALLET_CONFIG
),
],
EACLRole.SYSTEM: [
Wallet(wallet_path=IR_WALLET_PATH, config_path=IR_WALLET_CONFIG)
],
}
)
@pytest.fixture(scope="module")
@ -62,17 +61,22 @@ def file_path():
yield generate_file()
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def eacl_container_with_objects(wallets, file_path):
user_wallet = wallets.get_wallet()
with allure.step('Create eACL public container'):
with allure.step("Create eACL public container"):
cid = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL)
with allure.step('Add test objects to container'):
with allure.step("Add test objects to container"):
objects_oids = [
put_object(
user_wallet.wallet_path, file_path, cid,
attributes={'key1': 'val1', 'key': val, 'key2': 'abc'}) for val in range(OBJECT_COUNT)]
user_wallet.wallet_path,
file_path,
cid,
attributes={"key1": "val1", "key": val, "key2": "abc"},
)
for val in range(OBJECT_COUNT)
]
yield cid, objects_oids, file_path

View file

@ -3,21 +3,23 @@ import pytest
from python_keywords.acl import EACLRole
from python_keywords.container import create_container
from python_keywords.container_access import (check_full_access_to_container, check_no_access_to_container,
check_read_only_container)
from python_keywords.container_access import (
check_full_access_to_container,
check_no_access_to_container,
check_read_only_container,
)
from python_keywords.neofs_verbs import put_object
from wellknown_acl import PRIVATE_ACL_F, PUBLIC_ACL_F, READONLY_ACL_F
@pytest.mark.sanity
@pytest.mark.acl
@pytest.mark.acl_container
@pytest.mark.acl_basic
class TestACLBasic:
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def public_container(self, wallets):
user_wallet = wallets.get_wallet()
with allure.step('Create public container'):
with allure.step("Create public container"):
cid_public = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL_F)
yield cid_public
@ -25,29 +27,33 @@ class TestACLBasic:
# with allure.step('Delete public container'):
# delete_container(user_wallet.wallet_path, cid_public)
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def private_container(self, wallets):
user_wallet = wallets.get_wallet()
with allure.step('Create private container'):
cid_private = create_container(user_wallet.wallet_path, basic_acl=PRIVATE_ACL_F)
with allure.step("Create private container"):
cid_private = create_container(
user_wallet.wallet_path, basic_acl=PRIVATE_ACL_F
)
yield cid_private
# with allure.step('Delete private container'):
# delete_container(user_wallet.wallet_path, cid_private)
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def read_only_container(self, wallets):
user_wallet = wallets.get_wallet()
with allure.step('Create public readonly container'):
cid_read_only = create_container(user_wallet.wallet_path, basic_acl=READONLY_ACL_F)
with allure.step("Create public readonly container"):
cid_read_only = create_container(
user_wallet.wallet_path, basic_acl=READONLY_ACL_F
)
yield cid_read_only
# with allure.step('Delete public readonly container'):
# delete_container(user_wallet.wallet_path, cid_read_only)
@allure.title('Test basic ACL on public container')
@allure.title("Test basic ACL on public container")
def test_basic_acl_public(self, wallets, public_container, file_path):
"""
Test basic ACL set during public container creation.
@ -55,16 +61,31 @@ class TestACLBasic:
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
cid = public_container
for wallet, desc in ((user_wallet, 'owner'), (other_wallet, 'other users')):
with allure.step('Add test objects to container'):
# We create new objects for each wallet because check_full_access_to_container deletes the object
owner_object_oid = put_object(user_wallet.wallet_path, file_path, cid, attributes={'created': 'owner'})
other_object_oid = put_object(other_wallet.wallet_path, file_path, cid, attributes={'created': 'other'})
with allure.step(f'Check {desc} has full access to public container'):
check_full_access_to_container(wallet.wallet_path, cid, owner_object_oid, file_path)
check_full_access_to_container(wallet.wallet_path, cid, other_object_oid, file_path)
for wallet, desc in ((user_wallet, "owner"), (other_wallet, "other users")):
with allure.step("Add test objects to container"):
# We create new objects for each wallet because check_full_access_to_container
# deletes the object
owner_object_oid = put_object(
user_wallet.wallet_path,
file_path,
cid,
attributes={"created": "owner"},
)
other_object_oid = put_object(
other_wallet.wallet_path,
file_path,
cid,
attributes={"created": "other"},
)
with allure.step(f"Check {desc} has full access to public container"):
check_full_access_to_container(
wallet.wallet_path, cid, owner_object_oid, file_path
)
check_full_access_to_container(
wallet.wallet_path, cid, other_object_oid, file_path
)
@allure.title('Test basic ACL on private container')
@allure.title("Test basic ACL on private container")
def test_basic_acl_private(self, wallets, private_container, file_path):
"""
Test basic ACL set during private container creation.
@ -72,18 +93,23 @@ class TestACLBasic:
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
cid = private_container
with allure.step('Add test objects to container'):
with allure.step("Add test objects to container"):
owner_object_oid = put_object(user_wallet.wallet_path, file_path, cid)
with allure.step('Check only owner has full access to private container'):
with allure.step('Check no one except owner has access to operations with container'):
check_no_access_to_container(other_wallet.wallet_path, cid, owner_object_oid, file_path)
with allure.step("Check only owner has full access to private container"):
with allure.step(
"Check no one except owner has access to operations with container"
):
check_no_access_to_container(
other_wallet.wallet_path, cid, owner_object_oid, file_path
)
with allure.step('Check owner has full access to private container'):
with allure.step("Check owner has full access to private container"):
check_full_access_to_container(
user_wallet.wallet_path, cid, owner_object_oid, file_path)
user_wallet.wallet_path, cid, owner_object_oid, file_path
)
@allure.title('Test basic ACL on readonly container')
@allure.title("Test basic ACL on readonly container")
def test_basic_acl_readonly(self, wallets, read_only_container, file_path):
"""
Test basic ACL Operations for Read-Only Container.
@ -92,11 +118,17 @@ class TestACLBasic:
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
cid = read_only_container
with allure.step('Add test objects to container'):
with allure.step("Add test objects to container"):
object_oid = put_object(user_wallet.wallet_path, file_path, cid)
with allure.step('Check other has read-only access to operations with container'):
check_read_only_container(other_wallet.wallet_path, cid, object_oid, file_path)
with allure.step(
"Check other has read-only access to operations with container"
):
check_read_only_container(
other_wallet.wallet_path, cid, object_oid, file_path
)
with allure.step('Check owner has full access to public container'):
check_full_access_to_container(user_wallet.wallet_path, cid, object_oid, file_path)
with allure.step("Check owner has full access to public container"):
check_full_access_to_container(
user_wallet.wallet_path, cid, object_oid, file_path
)

View file

@ -1,59 +1,115 @@
import allure
import pytest
from python_keywords.acl import (EACLAccess, EACLOperation, EACLRole, EACLRule, create_eacl, form_bearertoken_file,
set_eacl, wait_for_cache_expired)
from python_keywords.container_access import (check_custom_access_to_container, check_full_access_to_container,
check_no_access_to_container)
from python_keywords.acl import (
EACLAccess,
EACLOperation,
EACLRole,
EACLRule,
create_eacl,
form_bearertoken_file,
set_eacl,
wait_for_cache_expired,
)
from python_keywords.container_access import (
check_custom_access_to_container,
check_full_access_to_container,
check_no_access_to_container,
)
@pytest.mark.sanity
@pytest.mark.acl
@pytest.mark.acl_bearer
class TestACLBearer:
@pytest.mark.parametrize('role', [EACLRole.USER, EACLRole.OTHERS])
@pytest.mark.parametrize("role", [EACLRole.USER, EACLRole.OTHERS])
def test_bearer_token_operations(self, wallets, eacl_container_with_objects, role):
allure.dynamic.title(f"Testcase to validate NeoFS operations with {role.value} BearerToken")
allure.dynamic.title(
f"Testcase to validate NeoFS operations with {role.value} BearerToken"
)
cid, objects_oids, file_path = eacl_container_with_objects
user_wallet = wallets.get_wallet()
deny_wallet = wallets.get_wallet(role)
with allure.step(f'Check {role.value} has full access to container without bearer token'):
check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path,
wallet_config=deny_wallet.config_path)
with allure.step(
f"Check {role.value} has full access to container without bearer token"
):
check_full_access_to_container(
deny_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
wallet_config=deny_wallet.config_path,
)
with allure.step(f'Set deny all operations for {role.value} via eACL'):
eacl = [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in EACLOperation]
with allure.step(f"Set deny all operations for {role.value} via eACL"):
eacl = [
EACLRule(access=EACLAccess.DENY, role=role, operation=op)
for op in EACLOperation
]
eacl_file = create_eacl(cid, eacl)
set_eacl(user_wallet.wallet_path, cid, eacl_file)
wait_for_cache_expired()
with allure.step(f'Create bearer token for {role.value} with all operations allowed'):
bearer_token = form_bearertoken_file(user_wallet.wallet_path, cid, [
EACLRule(operation=op, access=EACLAccess.ALLOW, role=role)
for op in EACLOperation])
with allure.step(
f"Create bearer token for {role.value} with all operations allowed"
):
bearer_token = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=role)
for op in EACLOperation
],
)
with allure.step(f'Check {role.value} without token has no access to all operations with container'):
with allure.step(
f"Check {role.value} without token has no access to all operations with container"
):
check_no_access_to_container(
deny_wallet.wallet_path, cid, objects_oids.pop(), file_path,
wallet_config=deny_wallet.config_path)
deny_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
wallet_config=deny_wallet.config_path,
)
with allure.step(f'Check {role.value} with token has access to all operations with container'):
check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path,
bearer=bearer_token, wallet_config=deny_wallet.config_path)
with allure.step(
f"Check {role.value} with token has access to all operations with container"
):
check_full_access_to_container(
deny_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
bearer=bearer_token,
wallet_config=deny_wallet.config_path,
)
with allure.step(f'Set allow all operations for {role.value} via eACL'):
eacl = [EACLRule(access=EACLAccess.ALLOW, role=role, operation=op) for op in EACLOperation]
with allure.step(f"Set allow all operations for {role.value} via eACL"):
eacl = [
EACLRule(access=EACLAccess.ALLOW, role=role, operation=op)
for op in EACLOperation
]
eacl_file = create_eacl(cid, eacl)
set_eacl(user_wallet.wallet_path, cid, eacl_file)
wait_for_cache_expired()
with allure.step(f'Check {role.value} without token has access to all operations with container'):
check_full_access_to_container(deny_wallet.wallet_path, cid, objects_oids.pop(), file_path,
wallet_config=deny_wallet.config_path)
with allure.step(
f"Check {role.value} without token has access to all operations with container"
):
check_full_access_to_container(
deny_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
wallet_config=deny_wallet.config_path,
)
@allure.title('BearerToken Operations for compound Operations')
def test_bearer_token_compound_operations(self, wallets, eacl_container_with_objects):
@allure.title("BearerToken Operations for compound Operations")
def test_bearer_token_compound_operations(
self, wallets, eacl_container_with_objects
):
cid, objects_oids, file_path = eacl_container_with_objects
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
@ -61,48 +117,105 @@ class TestACLBearer:
# Operations that we will deny for each role via eACL
deny_map = {
EACLRole.USER: [EACLOperation.DELETE],
EACLRole.OTHERS: [EACLOperation.GET, EACLOperation.PUT, EACLOperation.GET_RANGE]
EACLRole.OTHERS: [
EACLOperation.SEARCH,
EACLOperation.GET_RANGE_HASH,
EACLOperation.GET_RANGE,
],
}
# Operations that we will allow for each role with bearer token
bearer_map = {
EACLRole.USER: [EACLOperation.DELETE, EACLOperation.PUT, EACLOperation.GET_RANGE],
EACLRole.OTHERS: [EACLOperation.GET, EACLOperation.GET_RANGE],
EACLRole.USER: [
EACLOperation.DELETE,
EACLOperation.PUT,
EACLOperation.GET_RANGE,
],
EACLRole.OTHERS: [
EACLOperation.GET,
EACLOperation.GET_RANGE,
EACLOperation.GET_RANGE_HASH,
],
}
deny_map_with_bearer = {
EACLRole.USER: [op for op in deny_map[EACLRole.USER] if op not in bearer_map[EACLRole.USER]],
EACLRole.OTHERS: [op for op in deny_map[EACLRole.OTHERS] if op not in bearer_map[EACLRole.OTHERS]],
EACLRole.USER: [
op
for op in deny_map[EACLRole.USER]
if op not in bearer_map[EACLRole.USER]
],
EACLRole.OTHERS: [
op
for op in deny_map[EACLRole.OTHERS]
if op not in bearer_map[EACLRole.OTHERS]
],
}
eacl_deny = []
for role, operations in deny_map.items():
eacl_deny += [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in operations]
set_eacl(user_wallet.wallet_path, cid, eacl_table_path=create_eacl(cid, eacl_deny))
eacl_deny += [
EACLRule(access=EACLAccess.DENY, role=role, operation=op)
for op in operations
]
set_eacl(
user_wallet.wallet_path, cid, eacl_table_path=create_eacl(cid, eacl_deny)
)
wait_for_cache_expired()
with allure.step('Check rule consistency without bearer'):
check_custom_access_to_container(user_wallet.wallet_path, cid, objects_oids.pop(), file_path,
deny_operations=deny_map[EACLRole.USER],
wallet_config=user_wallet.config_path)
check_custom_access_to_container(other_wallet.wallet_path, cid, objects_oids.pop(), file_path,
deny_operations=deny_map[EACLRole.OTHERS],
wallet_config=other_wallet.config_path)
with allure.step("Check rule consistency without bearer"):
check_custom_access_to_container(
user_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
deny_operations=deny_map[EACLRole.USER],
wallet_config=user_wallet.config_path,
)
check_custom_access_to_container(
other_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
deny_operations=deny_map[EACLRole.OTHERS],
wallet_config=other_wallet.config_path,
)
with allure.step('Check rule consistency with bearer'):
bearer_token_user = form_bearertoken_file(user_wallet.wallet_path, cid, [
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in bearer_map[EACLRole.USER]])
with allure.step("Check rule consistency using bearer token"):
bearer_token_user = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in bearer_map[EACLRole.USER]
],
)
bearer_token_other = form_bearertoken_file(user_wallet.wallet_path, cid, [
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS)
for op in bearer_map[EACLRole.OTHERS]])
bearer_token_other = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(
operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS
)
for op in bearer_map[EACLRole.OTHERS]
],
)
check_custom_access_to_container(user_wallet.wallet_path, cid, objects_oids.pop(), file_path,
deny_operations=deny_map_with_bearer[EACLRole.USER],
bearer=bearer_token_user,
wallet_config=user_wallet.config_path)
check_custom_access_to_container(other_wallet.wallet_path, cid, objects_oids.pop(), file_path,
deny_operations=deny_map_with_bearer[EACLRole.OTHERS],
bearer=bearer_token_other,
wallet_config=other_wallet.config_path)
check_custom_access_to_container(
user_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
deny_operations=deny_map_with_bearer[EACLRole.USER],
bearer=bearer_token_user,
wallet_config=user_wallet.config_path,
)
check_custom_access_to_container(
other_wallet.wallet_path,
cid,
objects_oids.pop(),
file_path,
deny_operations=deny_map_with_bearer[EACLRole.OTHERS],
bearer=bearer_token_other,
wallet_config=other_wallet.config_path,
)

View file

@ -3,10 +3,20 @@ import pytest
from common import NEOFS_NETMAP_DICT
from failover_utils import wait_object_replication_on_nodes
from python_keywords.acl import (EACLAccess, EACLOperation, EACLRole, EACLRule, create_eacl, set_eacl,
wait_for_cache_expired)
from python_keywords.acl import (
EACLAccess,
EACLOperation,
EACLRole,
EACLRule,
create_eacl,
set_eacl,
wait_for_cache_expired,
)
from python_keywords.container import create_container
from python_keywords.container_access import check_full_access_to_container, check_no_access_to_container
from python_keywords.container_access import (
check_full_access_to_container,
check_no_access_to_container,
)
from python_keywords.neofs_verbs import put_object
from python_keywords.node_management import drop_object
from wellknown_acl import PUBLIC_ACL
@ -14,92 +24,155 @@ from wellknown_acl import PUBLIC_ACL
@pytest.mark.sanity
@pytest.mark.acl
@pytest.mark.acl_container
@pytest.mark.acl_extended
class TestEACLContainer:
NODE_COUNT = len(NEOFS_NETMAP_DICT.keys())
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def eacl_full_placement_container_with_object(self, wallets, file_path):
user_wallet = wallets.get_wallet()
with allure.step('Create eACL public container with full placement rule'):
full_placement_rule = f'REP {self.NODE_COUNT} IN X CBF 1 SELECT {self.NODE_COUNT} FROM * AS X'
cid = create_container(user_wallet.wallet_path, full_placement_rule, basic_acl=PUBLIC_ACL)
with allure.step("Create eACL public container with full placement rule"):
full_placement_rule = (
f"REP {self.NODE_COUNT} IN X CBF 1 SELECT {self.NODE_COUNT} FROM * AS X"
)
cid = create_container(
user_wallet.wallet_path, full_placement_rule, basic_acl=PUBLIC_ACL
)
with allure.step('Add test object to container'):
with allure.step("Add test object to container"):
oid = put_object(user_wallet.wallet_path, file_path, cid)
wait_object_replication_on_nodes(user_wallet.wallet_path, cid, oid, self.NODE_COUNT)
wait_object_replication_on_nodes(
user_wallet.wallet_path, cid, oid, self.NODE_COUNT
)
yield cid, oid, file_path
@pytest.mark.parametrize('deny_role', [EACLRole.USER, EACLRole.OTHERS])
def test_extended_acl_deny_all_operations(self, wallets, eacl_container_with_objects, deny_role):
@pytest.mark.parametrize("deny_role", [EACLRole.USER, EACLRole.OTHERS])
def test_extended_acl_deny_all_operations(
self, wallets, eacl_container_with_objects, deny_role
):
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(EACLRole.OTHERS)
deny_role_wallet = other_wallet if deny_role == EACLRole.OTHERS else user_wallet
not_deny_role_wallet = user_wallet if deny_role == EACLRole.OTHERS else other_wallet
deny_role_str = 'all others' if deny_role == EACLRole.OTHERS else 'user'
not_deny_role_str = 'user' if deny_role == EACLRole.OTHERS else 'all others'
allure.dynamic.title(f'Testcase to deny NeoFS operations for {deny_role_str}.')
not_deny_role_wallet = (
user_wallet if deny_role == EACLRole.OTHERS else other_wallet
)
deny_role_str = "all others" if deny_role == EACLRole.OTHERS else "user"
not_deny_role_str = "user" if deny_role == EACLRole.OTHERS else "all others"
allure.dynamic.title(f"Testcase to deny NeoFS operations for {deny_role_str}.")
cid, object_oids, file_path = eacl_container_with_objects
with allure.step(f'Deny all operations for {deny_role_str} via eACL'):
eacl_deny = [EACLRule(access=EACLAccess.DENY, role=deny_role, operation=op) for op in EACLOperation]
with allure.step(f"Deny all operations for {deny_role_str} via eACL"):
eacl_deny = [
EACLRule(access=EACLAccess.DENY, role=deny_role, operation=op)
for op in EACLOperation
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
with allure.step(f'Check only {not_deny_role_str} has full access to container'):
with allure.step(f'Check {deny_role_str} has not access to any operations with container'):
check_no_access_to_container(deny_role_wallet.wallet_path, cid, object_oids[0], file_path)
with allure.step(
f"Check only {not_deny_role_str} has full access to container"
):
with allure.step(
f"Check {deny_role_str} has not access to any operations with container"
):
check_no_access_to_container(
deny_role_wallet.wallet_path, cid, object_oids[0], file_path
)
with allure.step(f'Check {not_deny_role_wallet} has full access to eACL public container'):
check_full_access_to_container(not_deny_role_wallet.wallet_path, cid, object_oids.pop(), file_path)
with allure.step(
f"Check {not_deny_role_wallet} has full access to eACL public container"
):
check_full_access_to_container(
not_deny_role_wallet.wallet_path, cid, object_oids.pop(), file_path
)
with allure.step(f'Allow all operations for {deny_role_str} via eACL'):
eacl_deny = [EACLRule(access=EACLAccess.ALLOW, role=deny_role, operation=op) for op in EACLOperation]
with allure.step(f"Allow all operations for {deny_role_str} via eACL"):
eacl_deny = [
EACLRule(access=EACLAccess.ALLOW, role=deny_role, operation=op)
for op in EACLOperation
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
with allure.step(f'Check all have full access to eACL public container'):
check_full_access_to_container(user_wallet.wallet_path, cid, object_oids.pop(), file_path)
check_full_access_to_container(other_wallet.wallet_path, cid, object_oids.pop(), file_path)
with allure.step(f"Check all have full access to eACL public container"):
check_full_access_to_container(
user_wallet.wallet_path, cid, object_oids.pop(), file_path
)
check_full_access_to_container(
other_wallet.wallet_path, cid, object_oids.pop(), file_path
)
@allure.title('Testcase to allow NeoFS operations for only one other pubkey.')
def test_extended_acl_deny_all_operations_exclude_pubkey(self, wallets, eacl_container_with_objects):
@allure.title("Testcase to allow NeoFS operations for only one other pubkey.")
def test_extended_acl_deny_all_operations_exclude_pubkey(
self, wallets, eacl_container_with_objects
):
user_wallet = wallets.get_wallet()
other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[0:2]
other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[
0:2
]
cid, object_oids, file_path = eacl_container_with_objects
with allure.step('Deny all operations for others except single wallet via eACL'):
eacl = [EACLRule(access=EACLAccess.ALLOW, role=other_wallet_allow.wallet_path, operation=op)
for op in EACLOperation]
eacl += [EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) for op in EACLOperation]
with allure.step(
"Deny all operations for others except single wallet via eACL"
):
eacl = [
EACLRule(
access=EACLAccess.ALLOW,
role=other_wallet_allow.wallet_path,
operation=op,
)
for op in EACLOperation
]
eacl += [
EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op)
for op in EACLOperation
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl))
wait_for_cache_expired()
with allure.step('Check only owner and allowed other have full access to public container'):
with allure.step('Check other has not access to operations with container'):
check_no_access_to_container(other_wallet.wallet_path, cid, object_oids[0], file_path)
with allure.step(
"Check only owner and allowed other have full access to public container"
):
with allure.step("Check other has not access to operations with container"):
check_no_access_to_container(
other_wallet.wallet_path, cid, object_oids[0], file_path
)
with allure.step('Check owner has full access to public container'):
check_full_access_to_container(user_wallet.wallet_path, cid, object_oids.pop(), file_path)
with allure.step("Check owner has full access to public container"):
check_full_access_to_container(
user_wallet.wallet_path, cid, object_oids.pop(), file_path
)
with allure.step('Check allowed other has full access to public container'):
check_full_access_to_container(other_wallet_allow.wallet_path, cid, object_oids.pop(), file_path)
with allure.step("Check allowed other has full access to public container"):
check_full_access_to_container(
other_wallet_allow.wallet_path, cid, object_oids.pop(), file_path
)
@allure.title('Testcase to validate NeoFS replication with eACL deny rules.')
def test_extended_acl_deny_replication(self, wallets, eacl_full_placement_container_with_object, file_path):
@allure.title("Testcase to validate NeoFS replication with eACL deny rules.")
def test_extended_acl_deny_replication(
self, wallets, eacl_full_placement_container_with_object, file_path
):
user_wallet = wallets.get_wallet()
cid, oid, file_path = eacl_full_placement_container_with_object
with allure.step('Deny all operations for user via eACL'):
eacl_deny = [EACLRule(access=EACLAccess.DENY, role=EACLRole.USER, operation=op) for op in EACLOperation]
eacl_deny += [EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) for op in EACLOperation]
with allure.step("Deny all operations for user via eACL"):
eacl_deny = [
EACLRule(access=EACLAccess.DENY, role=EACLRole.USER, operation=op)
for op in EACLOperation
]
eacl_deny += [
EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op)
for op in EACLOperation
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
with allure.step('Drop object to check replication'):
with allure.step("Drop object to check replication"):
drop_object(node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid)
storage_wallet_path = NEOFS_NETMAP_DICT[[*NEOFS_NETMAP_DICT][0]]["wallet_path"]
with allure.step('Wait for dropped object replicated'):
wait_object_replication_on_nodes(storage_wallet_path, cid, oid, self.NODE_COUNT)
with allure.step("Wait for dropped object replicated"):
wait_object_replication_on_nodes(
storage_wallet_path, cid, oid, self.NODE_COUNT
)

View file

@ -1,162 +1,412 @@
import allure
import pytest
from python_keywords.acl import (EACLAccess, EACLFilter, EACLFilters, EACLHeaderType, EACLMatchType, EACLOperation,
EACLRole, EACLRule, create_eacl, set_eacl, wait_for_cache_expired)
from python_keywords.acl import (
EACLAccess,
EACLFilter,
EACLFilters,
EACLHeaderType,
EACLMatchType,
EACLOperation,
EACLRole,
EACLRule,
create_eacl,
form_bearertoken_file,
set_eacl,
wait_for_cache_expired,
)
from python_keywords.container import create_container, delete_container
from python_keywords.container_access import check_full_access_to_container, check_no_access_to_container
from python_keywords.container_access import (
check_full_access_to_container,
check_no_access_to_container,
)
from python_keywords.neofs_verbs import put_object
from python_keywords.object_access import can_get_object, can_get_head_object, can_put_object
from python_keywords.object_access import (
can_get_head_object,
can_get_object,
can_put_object,
)
from wellknown_acl import PUBLIC_ACL
@pytest.mark.sanity
@pytest.mark.acl
@pytest.mark.acl_container
@pytest.mark.acl_bearer
@pytest.mark.acl_filters
class TestEACLFilters:
# SPEC: https://github.com/nspcc-dev/neofs-spec/blob/master/01-arch/07-acl.md
ATTRIBUTE = {'check_key': 'check_value'}
OTHER_ATTRIBUTE = {'check_key': 'other_value'}
SET_HEADERS = {'key_one': 'check_value', 'x_key': 'xvalue', 'check_key': 'check_value'}
OTHER_HEADERS = {'key_one': 'check_value', 'x_key': 'other_value', 'check_key': 'other_value'}
REQ_EQUAL_FILTER = EACLFilter(key='check_key', value='check_value', header_type=EACLHeaderType.REQUEST)
NOT_REQ_EQUAL_FILTER = EACLFilter(key='check_key', value='other_value', match_type=EACLMatchType.STRING_NOT_EQUAL,
header_type=EACLHeaderType.REQUEST)
OBJ_EQUAL_FILTER = EACLFilter(key='check_key', value='check_value', header_type=EACLHeaderType.OBJECT)
NOT_OBJ_EQUAL_FILTER = EACLFilter(key='check_key', value='other_value', match_type=EACLMatchType.STRING_NOT_EQUAL,
header_type=EACLHeaderType.OBJECT)
OBJECT_COUNT = 3
OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS = [EACLOperation.GET, EACLOperation.HEAD, EACLOperation.PUT]
ATTRIBUTE = {"check_key": "check_value"}
OTHER_ATTRIBUTE = {"check_key": "other_value"}
SET_HEADERS = {
"key_one": "check_value",
"x_key": "xvalue",
"check_key": "check_value",
}
OTHER_HEADERS = {
"key_one": "check_value",
"x_key": "other_value",
"check_key": "other_value",
}
REQ_EQUAL_FILTER = EACLFilter(
key="check_key", value="check_value", header_type=EACLHeaderType.REQUEST
)
NOT_REQ_EQUAL_FILTER = EACLFilter(
key="check_key",
value="other_value",
match_type=EACLMatchType.STRING_NOT_EQUAL,
header_type=EACLHeaderType.REQUEST,
)
OBJ_EQUAL_FILTER = EACLFilter(
key="check_key", value="check_value", header_type=EACLHeaderType.OBJECT
)
NOT_OBJ_EQUAL_FILTER = EACLFilter(
key="check_key",
value="other_value",
match_type=EACLMatchType.STRING_NOT_EQUAL,
header_type=EACLHeaderType.OBJECT,
)
OBJECT_COUNT = 5
OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS = [
EACLOperation.GET,
EACLOperation.HEAD,
EACLOperation.PUT,
]
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def eacl_container_with_objects(self, wallets, file_path):
user_wallet = wallets.get_wallet()
with allure.step('Create eACL public container'):
with allure.step("Create eACL public container"):
cid = create_container(user_wallet.wallet_path, basic_acl=PUBLIC_ACL)
with allure.step('Add test objects to container'):
with allure.step("Add test objects to container"):
objects_with_header = [
put_object(user_wallet.wallet_path, file_path, cid,
attributes={**self.SET_HEADERS, 'key': val}) for val in range(self.OBJECT_COUNT)]
put_object(
user_wallet.wallet_path,
file_path,
cid,
attributes={**self.SET_HEADERS, "key": val},
)
for val in range(self.OBJECT_COUNT)
]
objects_with_other_header = [
put_object(user_wallet.wallet_path, file_path, cid,
attributes={**self.OTHER_HEADERS, 'key': val}) for val in range(self.OBJECT_COUNT)]
put_object(
user_wallet.wallet_path,
file_path,
cid,
attributes={**self.OTHER_HEADERS, "key": val},
)
for val in range(self.OBJECT_COUNT)
]
objects_without_header = [
put_object(user_wallet.wallet_path, file_path, cid) for _ in range(self.OBJECT_COUNT)]
put_object(user_wallet.wallet_path, file_path, cid)
for _ in range(self.OBJECT_COUNT)
]
yield cid, objects_with_header, objects_with_other_header, objects_without_header, file_path
with allure.step('Delete eACL public container'):
with allure.step("Delete eACL public container"):
delete_container(user_wallet.wallet_path, cid)
@pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL])
def test_extended_acl_filters_request(self, wallets, eacl_container_with_objects, match_type):
allure.dynamic.title(f"Validate NeoFS operations with request filter: {match_type.name}")
@pytest.mark.parametrize(
"match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]
)
def test_extended_acl_filters_request(
self, wallets, eacl_container_with_objects, match_type
):
allure.dynamic.title(
f"Validate NeoFS operations with request filter: {match_type.name}"
)
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(EACLRole.OTHERS)
cid, objects_with_header, objects_with_other_header, objects_without_header, file_path =\
eacl_container_with_objects
(
cid,
objects_with_header,
objects_with_other_header,
objects_without_header,
file_path,
) = eacl_container_with_objects
with allure.step('Deny all operations for other with eACL request filter'):
with allure.step("Deny all operations for other with eACL request filter"):
equal_filter = EACLFilter(**self.REQ_EQUAL_FILTER.__dict__)
equal_filter.match_type = match_type
eacl_deny = [EACLRule(access=EACLAccess.DENY,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op) for op in EACLOperation]
eacl_deny = [
EACLRule(
access=EACLAccess.DENY,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op,
)
for op in EACLOperation
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
# Filter denies requests where "check_key {match_type} ATTRIBUTE", so when match_type
# is STRING_EQUAL, then requests with "check_key=OTHER_ATTRIBUTE" will be allowed while
# requests with "check_key=ATTRIBUTE" will be denied, and vice versa
allow_headers = self.OTHER_ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.ATTRIBUTE
deny_headers = self.ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.OTHER_ATTRIBUTE
# We test on 3 groups of objects with various headers, but eACL rule should ignore object headers and
allow_headers = (
self.OTHER_ATTRIBUTE
if match_type == EACLMatchType.STRING_EQUAL
else self.ATTRIBUTE
)
deny_headers = (
self.ATTRIBUTE
if match_type == EACLMatchType.STRING_EQUAL
else self.OTHER_ATTRIBUTE
)
# We test on 3 groups of objects with various headers,
# but eACL rule should ignore object headers and
# work only based on request headers
for oid in (objects_with_header, objects_with_other_header, objects_without_header):
with allure.step('Check other has full access when sending request without headers'):
check_full_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path)
for oid in (
objects_with_header,
objects_with_other_header,
objects_without_header,
):
with allure.step(
"Check other has full access when sending request without headers"
):
check_full_access_to_container(
other_wallet.wallet_path, cid, oid.pop(), file_path
)
with allure.step('Check other has full access when sending request with allowed headers'):
check_full_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path, xhdr=allow_headers)
with allure.step(
"Check other has full access when sending request with allowed headers"
):
check_full_access_to_container(
other_wallet.wallet_path,
cid,
oid.pop(),
file_path,
xhdr=allow_headers,
)
with allure.step('Check other has no access when sending request with denied headers'):
check_no_access_to_container(other_wallet.wallet_path, cid, oid.pop(), file_path, xhdr=deny_headers)
with allure.step(
"Check other has no access when sending request with denied headers"
):
check_no_access_to_container(
other_wallet.wallet_path,
cid,
oid.pop(),
file_path,
xhdr=deny_headers,
)
@pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL])
def test_extended_acl_deny_filters_object(self, wallets, eacl_container_with_objects, match_type):
allure.dynamic.title(f"Validate NeoFS operations with deny user headers filter: {match_type.name}")
with allure.step(
"Check other has full access when sending request "
"with denied headers and using bearer token"
):
bearer_token_other = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(
operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS
)
for op in EACLOperation
],
)
check_full_access_to_container(
other_wallet.wallet_path,
cid,
oid.pop(),
file_path,
xhdr=deny_headers,
bearer=bearer_token_other,
)
@pytest.mark.parametrize(
"match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]
)
def test_extended_acl_deny_filters_object(
self, wallets, eacl_container_with_objects, match_type
):
allure.dynamic.title(
f"Validate NeoFS operations with deny user headers filter: {match_type.name}"
)
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(EACLRole.OTHERS)
cid, objects_with_header, objects_with_other_header, objs_without_header, file_path = \
eacl_container_with_objects
(
cid,
objects_with_header,
objects_with_other_header,
objs_without_header,
file_path,
) = eacl_container_with_objects
with allure.step('Deny all operations for other with object filter'):
with allure.step("Deny all operations for other with object filter"):
equal_filter = EACLFilter(**self.OBJ_EQUAL_FILTER.__dict__)
equal_filter.match_type = match_type
eacl_deny = [EACLRule(access=EACLAccess.DENY,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS]
eacl_deny = [
EACLRule(
access=EACLAccess.DENY,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op,
)
for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
allow_objects = objects_with_other_header if match_type == EACLMatchType.STRING_EQUAL else objects_with_header
deny_objects = objects_with_header if match_type == EACLMatchType.STRING_EQUAL else objects_with_other_header
allow_objects = (
objects_with_other_header
if match_type == EACLMatchType.STRING_EQUAL
else objects_with_header
)
deny_objects = (
objects_with_header
if match_type == EACLMatchType.STRING_EQUAL
else objects_with_other_header
)
# We will attempt requests with various headers, but eACL rule should ignore request headers and validate
# We will attempt requests with various headers,
# but eACL rule should ignore request headers and validate
# only object headers
for xhdr in (self.ATTRIBUTE, self.OTHER_ATTRIBUTE, None):
with allure.step(f'Check other have full access to objects without attributes'):
with allure.step(
"Check other have full access to objects without attributes"
):
check_full_access_to_container(
other_wallet.wallet_path, cid, objs_without_header.pop(), file_path, xhdr=xhdr)
other_wallet.wallet_path,
cid,
objs_without_header.pop(),
file_path,
xhdr=xhdr,
)
with allure.step(f'Check other have full access to objects without deny attribute'):
with allure.step(
"Check other have full access to objects without deny attribute"
):
check_full_access_to_container(
other_wallet.wallet_path, cid, allow_objects.pop(), file_path, xhdr=xhdr)
other_wallet.wallet_path,
cid,
allow_objects.pop(),
file_path,
xhdr=xhdr,
)
with allure.step(f'Check other have no access to objects with deny attribute'):
oid = deny_objects.pop()
with allure.step(
"Check other have no access to objects with deny attribute"
):
with pytest.raises(AssertionError):
assert can_get_head_object(other_wallet.wallet_path, cid, oid, xhdr=xhdr)
assert can_get_head_object(
other_wallet.wallet_path, cid, deny_objects[0], xhdr=xhdr
)
with pytest.raises(AssertionError):
assert can_get_object(other_wallet.wallet_path, cid, oid, file_path, xhdr=xhdr)
assert can_get_object(
other_wallet.wallet_path,
cid,
deny_objects[0],
file_path,
xhdr=xhdr,
)
allow_attribute = self.OTHER_ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.ATTRIBUTE
with allure.step('Check other can PUT objects without denied attribute'):
assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=allow_attribute)
with allure.step(
"Check other have access to objects with deny attribute and using bearer token"
):
bearer_token_other = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(
operation=op,
access=EACLAccess.ALLOW,
role=EACLRole.OTHERS,
)
for op in EACLOperation
],
)
check_full_access_to_container(
other_wallet.wallet_path,
cid,
deny_objects.pop(),
file_path,
xhdr=xhdr,
bearer=bearer_token_other,
)
allow_attribute = (
self.OTHER_ATTRIBUTE
if match_type == EACLMatchType.STRING_EQUAL
else self.ATTRIBUTE
)
with allure.step("Check other can PUT objects without denied attribute"):
assert can_put_object(
other_wallet.wallet_path, cid, file_path, attributes=allow_attribute
)
assert can_put_object(other_wallet.wallet_path, cid, file_path)
deny_attribute = self.ATTRIBUTE if match_type == EACLMatchType.STRING_EQUAL else self.OTHER_ATTRIBUTE
with allure.step('Check other can not PUT objects with denied attribute'):
deny_attribute = (
self.ATTRIBUTE
if match_type == EACLMatchType.STRING_EQUAL
else self.OTHER_ATTRIBUTE
)
with allure.step("Check other can not PUT objects with denied attribute"):
with pytest.raises(AssertionError):
assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=deny_attribute)
assert can_put_object(
other_wallet.wallet_path, cid, file_path, attributes=deny_attribute
)
@pytest.mark.parametrize('match_type', [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL])
def test_extended_acl_allow_filters_object(self, wallets, eacl_container_with_objects, match_type):
with allure.step(
"Check other can PUT objects with denied attribute and using bearer token"
):
bearer_token_other_for_put = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(
operation=EACLOperation.PUT,
access=EACLAccess.ALLOW,
role=EACLRole.OTHERS,
)
],
)
assert can_put_object(
other_wallet.wallet_path,
cid,
file_path,
attributes=deny_attribute,
bearer=bearer_token_other_for_put,
)
@pytest.mark.parametrize(
"match_type", [EACLMatchType.STRING_EQUAL, EACLMatchType.STRING_NOT_EQUAL]
)
def test_extended_acl_allow_filters_object(
self, wallets, eacl_container_with_objects, match_type
):
allure.dynamic.title(
f"Testcase to validate NeoFS operation with allow eACL user headers filters: {match_type.name}")
"Testcase to validate NeoFS operation with allow eACL user headers filters:"
f"{match_type.name}"
)
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(EACLRole.OTHERS)
cid, objects_with_header, objects_with_other_header, objects_without_header, file_path = \
eacl_container_with_objects
(
cid,
objects_with_header,
objects_with_other_header,
objects_without_header,
file_path,
) = eacl_container_with_objects
with allure.step('Deny all operations for others except few operations allowed by object filter'):
with allure.step(
"Deny all operations for others except few operations allowed by object filter"
):
equal_filter = EACLFilter(**self.OBJ_EQUAL_FILTER.__dict__)
equal_filter.match_type = match_type
eacl = [
EACLRule(access=EACLAccess.ALLOW,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS
] + [
EACLRule(access=EACLAccess.DENY,
role=EACLRole.OTHERS,
operation=op) for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS
EACLRule(
access=EACLAccess.ALLOW,
role=EACLRole.OTHERS,
filters=EACLFilters([equal_filter]),
operation=op,
)
for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS
] + [
EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op)
for op in self.OBJECT_ATTRIBUTES_FILTER_SUPPORTED_OPERATIONS
]
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl))
wait_for_cache_expired()
@ -172,7 +422,7 @@ class TestEACLFilters:
allow_attribute = self.OTHER_ATTRIBUTE
deny_attribute = self.ATTRIBUTE
with allure.step(f'Check other cannot get and put objects without attributes'):
with allure.step(f"Check other cannot get and put objects without attributes"):
oid = objects_without_header.pop()
with pytest.raises(AssertionError):
assert can_get_head_object(other_wallet.wallet_path, cid, oid)
@ -181,17 +431,80 @@ class TestEACLFilters:
with pytest.raises(AssertionError):
assert can_put_object(other_wallet.wallet_path, cid, file_path)
with allure.step(f'Check other can get objects with attributes matching the filter'):
with allure.step(
"Check other can get and put objects without attributes "
"and using bearer token"
):
bearer_token_other = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(
operation=op,
access=EACLAccess.ALLOW,
role=EACLRole.OTHERS,
)
for op in EACLOperation
],
)
assert can_get_head_object(
other_wallet.wallet_path,
cid,
objects_without_header[0],
bearer=bearer_token_other,
)
assert can_get_object(
other_wallet.wallet_path,
cid,
objects_without_header[0],
file_path,
bearer=bearer_token_other,
)
assert can_put_object(
other_wallet.wallet_path, cid, file_path, bearer=bearer_token_other
)
with allure.step(
f"Check other can get objects with attributes matching the filter"
):
oid = allow_objects.pop()
assert can_get_head_object(other_wallet.wallet_path, cid, oid)
assert can_get_object(other_wallet.wallet_path, cid, oid, file_path)
assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=allow_attribute)
assert can_put_object(
other_wallet.wallet_path, cid, file_path, attributes=allow_attribute
)
with allure.step(f'Check other cannot get objects without attributes matching the filter'):
with allure.step(
"Check other cannot get objects without attributes matching the filter"
):
with pytest.raises(AssertionError):
assert can_get_head_object(
other_wallet.wallet_path, cid, deny_objects[0]
)
with pytest.raises(AssertionError):
assert can_get_object(
other_wallet.wallet_path, cid, deny_objects[0], file_path
)
with pytest.raises(AssertionError):
assert can_put_object(
other_wallet.wallet_path, cid, file_path, attributes=deny_attribute
)
with allure.step(
"Check other can get objects without attributes matching the filter "
"and using bearer token"
):
oid = deny_objects.pop()
with pytest.raises(AssertionError):
assert can_get_head_object(other_wallet.wallet_path, cid, oid)
with pytest.raises(AssertionError):
assert can_get_object(other_wallet.wallet_path, cid, oid, file_path)
with pytest.raises(AssertionError):
assert can_put_object(other_wallet.wallet_path, cid, file_path, attributes=deny_attribute)
assert can_get_head_object(
other_wallet.wallet_path, cid, oid, bearer=bearer_token_other
)
assert can_get_object(
other_wallet.wallet_path, cid, oid, file_path, bearer=bearer_token_other
)
assert can_put_object(
other_wallet.wallet_path,
cid,
file_path,
attributes=deny_attribute,
bearer=bearer_token_other,
)