http test with bearer token

Signed-off-by: Vladislav Karakozov <v.karakozov@yadro.com>
This commit is contained in:
Vladislav Karakozov 2022-12-29 10:17:51 +03:00 committed by Vlad K
parent ced72602ef
commit 690323e85d
3 changed files with 156 additions and 7 deletions

View file

@ -172,6 +172,7 @@ def form_bearertoken_file(
eacl_rule_list: List[Union[EACLRule, EACLPubKey]],
shell: Shell,
endpoint: str,
sign: Optional[bool] = True,
) -> str:
"""
This function fetches eACL for given <cid> on behalf of <wif>,
@ -219,7 +220,14 @@ def form_bearertoken_file(
json.dump(eacl_result, eacl_file, ensure_ascii=False, indent=4)
logger.info(f"Got these extended ACL records: {eacl_result}")
sign_bearer(shell, wif, file_path)
if sign:
sign_bearer(
shell=shell,
wallet_path=wif,
eacl_rules_file_from=file_path,
eacl_rules_file_to=file_path,
json=True,
)
return file_path
@ -246,10 +254,12 @@ def eacl_rules(access: str, verbs: list, user: str) -> list[str]:
return rules
def sign_bearer(shell: Shell, wallet_path: str, eacl_rules_file: str) -> None:
def sign_bearer(
shell: Shell, wallet_path: str, eacl_rules_file_from: str, eacl_rules_file_to: str, json: bool
) -> None:
neofscli = NeofsCli(shell=shell, neofs_cli_exec_path=NEOFS_CLI_EXEC, config_file=WALLET_CONFIG)
neofscli.util.sign_bearer_token(
wallet=wallet_path, from_file=eacl_rules_file, to_file=eacl_rules_file, json=True
wallet=wallet_path, from_file=eacl_rules_file_from, to_file=eacl_rules_file_to, json=json
)
@ -257,3 +267,12 @@ def sign_bearer(shell: Shell, wallet_path: str, eacl_rules_file: str) -> None:
def wait_for_cache_expired():
sleep(NEOFS_CONTRACT_CACHE_TIMEOUT)
return
@allure.step("Return bearer token in base64 to caller")
def bearer_token_base64_from_file(
bearer_path: str,
) -> str:
with open(bearer_path, "rb") as file:
signed = file.read()
return base64.b64encode(signed).decode("utf-8")

View file

@ -1,3 +1,4 @@
import base64
import logging
import os
import random
@ -19,8 +20,6 @@ from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import get_object
from python_keywords.storage_policy import get_nodes_without_object
from pytest_tests.steps.cluster_test_base import ClusterTestBase
logger = logging.getLogger("NeoLogger")
ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir/")
@ -288,9 +287,9 @@ def get_object_and_verify_hashes(
)
# for some reason we can face with case when nodes_list is empty due to object resides in all nodes
if nodes_list:
random_node = random.choice(nodes)
else:
random_node = random.choice(nodes_list)
else:
random_node = random.choice(nodes)
object_getter = object_getter or get_via_http_gate