Add eACL test for system account

Signed-off-by: Vladimir Avdeev <v.avdeev@yadro.com>
This commit is contained in:
Vladimir Avdeev 2022-09-19 18:54:00 +03:00 committed by Julia Kovshova
parent c53e48d1f8
commit 2a175b5824
3 changed files with 144 additions and 21 deletions

View file

@ -5,6 +5,7 @@ import allure
import pytest
from common import ASSETS_DIR, IR_WALLET_CONFIG, IR_WALLET_PATH, WALLET_CONFIG
from common import STORAGE_WALLET_PATH, STORAGE_WALLET_CONFIG
from python_keywords.acl import EACLRole
from python_keywords.container import create_container
from python_keywords.neofs_verbs import put_object
@ -50,7 +51,8 @@ def wallets(prepare_wallet_and_deposit):
),
],
EACLRole.SYSTEM: [
Wallet(wallet_path=IR_WALLET_PATH, config_path=IR_WALLET_CONFIG)
Wallet(wallet_path=IR_WALLET_PATH, config_path=IR_WALLET_CONFIG),
Wallet(wallet_path=STORAGE_WALLET_PATH, config_path=STORAGE_WALLET_CONFIG)
],
}
)

View file

@ -19,6 +19,15 @@ from python_keywords.container_access import (
)
from python_keywords.neofs_verbs import put_object
from python_keywords.node_management import drop_object
from python_keywords.object_access import (
can_get_object,
can_put_object,
can_delete_object,
can_get_head_object,
can_get_range_hash_of_object,
can_get_range_of_object,
can_search_object,
)
from wellknown_acl import PUBLIC_ACL
@ -41,9 +50,7 @@ class TestEACLContainer:
with allure.step("Add test object to container"):
oid = put_object(user_wallet.wallet_path, file_path, cid)
wait_object_replication_on_nodes(
user_wallet.wallet_path, cid, oid, self.NODE_COUNT
)
wait_object_replication_on_nodes(user_wallet.wallet_path, cid, oid, self.NODE_COUNT)
yield cid, oid, file_path
@ -54,9 +61,7 @@ class TestEACLContainer:
user_wallet = wallets.get_wallet()
other_wallet = wallets.get_wallet(EACLRole.OTHERS)
deny_role_wallet = other_wallet if deny_role == EACLRole.OTHERS else user_wallet
not_deny_role_wallet = (
user_wallet if deny_role == EACLRole.OTHERS else other_wallet
)
not_deny_role_wallet = user_wallet if deny_role == EACLRole.OTHERS else other_wallet
deny_role_str = "all others" if deny_role == EACLRole.OTHERS else "user"
not_deny_role_str = "user" if deny_role == EACLRole.OTHERS else "all others"
allure.dynamic.title(f"Testcase to deny NeoFS operations for {deny_role_str}.")
@ -70,9 +75,7 @@ class TestEACLContainer:
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl_deny))
wait_for_cache_expired()
with allure.step(
f"Check only {not_deny_role_str} has full access to container"
):
with allure.step(f"Check only {not_deny_role_str} has full access to container"):
with allure.step(
f"Check {deny_role_str} has not access to any operations with container"
):
@ -108,14 +111,10 @@ class TestEACLContainer:
self, wallets, eacl_container_with_objects
):
user_wallet = wallets.get_wallet()
other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[
0:2
]
other_wallet, other_wallet_allow = wallets.get_wallets_list(EACLRole.OTHERS)[0:2]
cid, object_oids, file_path = eacl_container_with_objects
with allure.step(
"Deny all operations for others except single wallet via eACL"
):
with allure.step("Deny all operations for others except single wallet via eACL"):
eacl = [
EACLRule(
access=EACLAccess.ALLOW,
@ -131,9 +130,7 @@ class TestEACLContainer:
set_eacl(user_wallet.wallet_path, cid, create_eacl(cid, eacl))
wait_for_cache_expired()
with allure.step(
"Check only owner and allowed other have full access to public container"
):
with allure.step("Check only owner and allowed other have full access to public container"):
with allure.step("Check other has not access to operations with container"):
check_no_access_to_container(
other_wallet.wallet_path, cid, object_oids[0], file_path
@ -173,6 +170,129 @@ class TestEACLContainer:
storage_wallet_path = NEOFS_NETMAP_DICT[[*NEOFS_NETMAP_DICT][0]]["wallet_path"]
with allure.step("Wait for dropped object replicated"):
wait_object_replication_on_nodes(
storage_wallet_path, cid, oid, self.NODE_COUNT
wait_object_replication_on_nodes(storage_wallet_path, cid, oid, self.NODE_COUNT)
@allure.title("Testcase to validate NeoFS system operations with extended ACL")
def test_extended_actions_system(self, wallets, eacl_container_with_objects):
user_wallet = wallets.get_wallet()
ir_wallet, storage_wallet = wallets.get_wallets_list(role=EACLRole.SYSTEM)[:2]
cid, object_oids, file_path = eacl_container_with_objects
with allure.step("Check IR and STORAGE rules compliance"):
assert not can_put_object(ir_wallet.wallet_path, cid, file_path)
assert can_put_object(storage_wallet.wallet_path, cid, file_path)
assert can_get_object(ir_wallet.wallet_path, cid, object_oids[0], file_path)
assert can_get_object(storage_wallet.wallet_path, cid, object_oids[0], file_path)
assert can_get_head_object(ir_wallet.wallet_path, cid, object_oids[0])
assert can_get_head_object(storage_wallet.wallet_path, cid, object_oids[0])
assert can_search_object(ir_wallet.wallet_path, cid, object_oids[0])
assert can_search_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(storage_wallet.wallet_path, cid, object_oids[0])
with allure.step("Deny all operations for SYSTEM via eACL"):
set_eacl(
user_wallet.wallet_path,
cid,
create_eacl(
cid,
[
EACLRule(access=EACLAccess.DENY, role=EACLRole.SYSTEM, operation=op)
for op in EACLOperation
],
),
)
wait_for_cache_expired()
with allure.step("Check IR and STORAGE rules compliance with deny eACL"):
assert not can_put_object(ir_wallet.wallet_path, cid, file_path)
assert not can_put_object(storage_wallet.wallet_path, cid, file_path)
with pytest.raises(AssertionError):
assert can_get_object(ir_wallet.wallet_path, cid, object_oids[0], file_path)
with pytest.raises(AssertionError):
assert can_get_object(storage_wallet.wallet_path, cid, object_oids[0], file_path)
with pytest.raises(AssertionError):
assert can_get_head_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_head_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_search_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_search_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(storage_wallet.wallet_path, cid, object_oids[0])
with allure.step("Allow all operations for SYSTEM via eACL"):
set_eacl(
user_wallet.wallet_path,
cid,
create_eacl(
cid,
[
EACLRule(access=EACLAccess.ALLOW, role=EACLRole.SYSTEM, operation=op)
for op in EACLOperation
],
),
)
wait_for_cache_expired()
with allure.step("Check IR and STORAGE rules compliance with allow eACL"):
assert not can_put_object(ir_wallet.wallet_path, cid, file_path)
assert can_put_object(storage_wallet.wallet_path, cid, file_path)
assert can_get_object(ir_wallet.wallet_path, cid, object_oids[0], file_path)
assert can_get_object(storage_wallet.wallet_path, cid, object_oids[0], file_path)
assert can_get_head_object(ir_wallet.wallet_path, cid, object_oids[0])
assert can_get_head_object(storage_wallet.wallet_path, cid, object_oids[0])
assert can_search_object(ir_wallet.wallet_path, cid, object_oids[0])
assert can_search_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_get_range_hash_of_object(storage_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(ir_wallet.wallet_path, cid, object_oids[0])
with pytest.raises(AssertionError):
assert can_delete_object(storage_wallet.wallet_path, cid, object_oids[0])

View file

@ -51,6 +51,7 @@ STORAGE_WALLET_PATH_3 = os.getenv("STORAGE_WALLET_PATH_3", f"{DEVENV_PATH}/servi
STORAGE_WALLET_PATH_4 = os.getenv("STORAGE_WALLET_PATH_4", f"{DEVENV_PATH}/services/storage/wallet04.json")
STORAGE_WALLET_PATH = STORAGE_WALLET_PATH_1
STORAGE_WALLET_PASS = os.getenv("STORAGE_WALLET_PASS", "")
STORAGE_WALLET_CONFIG = f"{CLI_CONFIGS_PATH}/empty_passwd.yml"
NEOFS_NETMAP_DICT = {
's01': {