http test with bearer token

Signed-off-by: Vladislav Karakozov <v.karakozov@yadro.com>
This commit is contained in:
Vladislav Karakozov 2022-12-29 10:17:51 +03:00 committed by Julia Kovshova
parent a2d272eaee
commit 5083b5adad
3 changed files with 156 additions and 7 deletions

View file

@ -0,0 +1,131 @@
import logging
import allure
import pytest
from container import create_container
from file_helper import generate_file
from http_gate import get_object_and_verify_hashes, upload_via_http_gate_curl
from python_keywords.acl import (
EACLAccess,
EACLOperation,
EACLRole,
EACLRule,
bearer_token_base64_from_file,
create_eacl,
form_bearertoken_file,
set_eacl,
sign_bearer,
wait_for_cache_expired,
)
from wellknown_acl import PUBLIC_ACL
from steps.cluster_test_base import ClusterTestBase
logger = logging.getLogger("NeoLogger")
@pytest.mark.sanity
@pytest.mark.http_gate
class Test_http_bearer(ClusterTestBase):
PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X"
@pytest.fixture(scope="class", autouse=True)
@allure.title("[Class/Autouse]: Prepare wallet and deposit")
def prepare_wallet(self, default_wallet):
Test_http_bearer.wallet = default_wallet
@pytest.fixture(scope="class")
def user_container(self) -> str:
return create_container(
wallet=self.wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=self.PLACEMENT_RULE,
basic_acl=PUBLIC_ACL,
)
@pytest.fixture(scope="class")
def eacl_deny_for_others(self, user_container: str) -> None:
with allure.step(f"Set deny all operations for {EACLRole.OTHERS} via eACL"):
eacl = EACLRule(
access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=EACLOperation.PUT
)
set_eacl(
self.wallet,
user_container,
create_eacl(user_container, eacl, shell=self.shell),
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
wait_for_cache_expired()
@pytest.fixture(scope="class")
def bearer_token_no_limit_for_others(self, user_container: str) -> str:
with allure.step(f"Create bearer token for {EACLRole.OTHERS} with all operations allowed"):
bearer = form_bearertoken_file(
self.wallet,
user_container,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS)
for op in EACLOperation
],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
sign=False,
)
bearer_signed = f"{bearer}_signed"
sign_bearer(
shell=self.shell,
wallet_path=self.wallet,
eacl_rules_file_from=bearer,
eacl_rules_file_to=bearer_signed,
json=False,
)
return bearer_token_base64_from_file(bearer_signed)
@allure.title(f"[negative] Put object without bearer token for {EACLRole.OTHERS}")
def test_unable_put_without_bearer_token(
self, simple_object_size: int, user_container: str, eacl_deny_for_others
):
eacl_deny_for_others
upload_via_http_gate_curl(
cid=user_container,
filepath=generate_file(simple_object_size),
endpoint=self.cluster.default_http_gate_endpoint,
error_pattern="access to object operation denied",
)
@pytest.mark.parametrize(
"object_size",
[pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
ids=["simple object", "complex object"],
)
def test_put_with_bearer_when_eacl_restrict(
self,
object_size: int,
user_container: str,
eacl_deny_for_others,
bearer_token_no_limit_for_others: str,
):
eacl_deny_for_others
bearer = bearer_token_no_limit_for_others
file_path = generate_file(object_size)
with allure.step(
f"Put object with bearer token for {EACLRole.OTHERS}, then get and verify hashes"
):
headers = [f" -H 'Authorization: Bearer {bearer}'"]
oid = upload_via_http_gate_curl(
cid=user_container,
filepath=file_path,
endpoint=self.cluster.default_http_gate_endpoint,
headers=headers,
)
get_object_and_verify_hashes(
oid=oid,
file_name=file_path,
wallet=self.wallet,
cid=user_container,
shell=self.shell,
nodes=self.cluster.storage_nodes,
endpoint=self.cluster.default_http_gate_endpoint,
)

View file

@ -172,6 +172,7 @@ def form_bearertoken_file(
eacl_rule_list: List[Union[EACLRule, EACLPubKey]], eacl_rule_list: List[Union[EACLRule, EACLPubKey]],
shell: Shell, shell: Shell,
endpoint: str, endpoint: str,
sign: Optional[bool] = True,
) -> str: ) -> str:
""" """
This function fetches eACL for given <cid> on behalf of <wif>, This function fetches eACL for given <cid> on behalf of <wif>,
@ -219,7 +220,14 @@ def form_bearertoken_file(
json.dump(eacl_result, eacl_file, ensure_ascii=False, indent=4) json.dump(eacl_result, eacl_file, ensure_ascii=False, indent=4)
logger.info(f"Got these extended ACL records: {eacl_result}") logger.info(f"Got these extended ACL records: {eacl_result}")
sign_bearer(shell, wif, file_path) if sign:
sign_bearer(
shell=shell,
wallet_path=wif,
eacl_rules_file_from=file_path,
eacl_rules_file_to=file_path,
json=True,
)
return file_path return file_path
@ -246,10 +254,12 @@ def eacl_rules(access: str, verbs: list, user: str) -> list[str]:
return rules return rules
def sign_bearer(shell: Shell, wallet_path: str, eacl_rules_file: str) -> None: def sign_bearer(
shell: Shell, wallet_path: str, eacl_rules_file_from: str, eacl_rules_file_to: str, json: bool
) -> None:
neofscli = NeofsCli(shell=shell, neofs_cli_exec_path=NEOFS_CLI_EXEC, config_file=WALLET_CONFIG) neofscli = NeofsCli(shell=shell, neofs_cli_exec_path=NEOFS_CLI_EXEC, config_file=WALLET_CONFIG)
neofscli.util.sign_bearer_token( neofscli.util.sign_bearer_token(
wallet=wallet_path, from_file=eacl_rules_file, to_file=eacl_rules_file, json=True wallet=wallet_path, from_file=eacl_rules_file_from, to_file=eacl_rules_file_to, json=json
) )
@ -257,3 +267,12 @@ def sign_bearer(shell: Shell, wallet_path: str, eacl_rules_file: str) -> None:
def wait_for_cache_expired(): def wait_for_cache_expired():
sleep(NEOFS_CONTRACT_CACHE_TIMEOUT) sleep(NEOFS_CONTRACT_CACHE_TIMEOUT)
return return
@allure.step("Return bearer token in base64 to caller")
def bearer_token_base64_from_file(
bearer_path: str,
) -> str:
with open(bearer_path, "rb") as file:
signed = file.read()
return base64.b64encode(signed).decode("utf-8")

View file

@ -1,3 +1,4 @@
import base64
import logging import logging
import os import os
import random import random
@ -19,8 +20,6 @@ from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import get_object from python_keywords.neofs_verbs import get_object
from python_keywords.storage_policy import get_nodes_without_object from python_keywords.storage_policy import get_nodes_without_object
from pytest_tests.steps.cluster_test_base import ClusterTestBase
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/") ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/")
@ -288,9 +287,9 @@ def get_object_and_verify_hashes(
) )
# for some reason we can face with case when nodes_list is empty due to object resides in all nodes # for some reason we can face with case when nodes_list is empty due to object resides in all nodes
if nodes_list: if nodes_list:
random_node = random.choice(nodes)
else:
random_node = random.choice(nodes_list) random_node = random.choice(nodes_list)
else:
random_node = random.choice(nodes)
object_getter = object_getter or get_via_http_gate object_getter = object_getter or get_via_http_gate