Add static session tests for object

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2022-11-10 17:56:25 +03:00 committed by abereziny
parent 013cf8fcc8
commit 21f1c3a922
10 changed files with 1002 additions and 72 deletions

View file

@ -5,6 +5,7 @@ CONTAINER_NOT_FOUND = "code = 3072.*message = container not found"
# Regex patterns of status codes of Object service (https://github.com/nspcc-dev/neofs-spec/blob/98b154848116223e486ce8b43eaa35fec08b4a99/20-api-v2/object.md) # Regex patterns of status codes of Object service (https://github.com/nspcc-dev/neofs-spec/blob/98b154848116223e486ce8b43eaa35fec08b4a99/20-api-v2/object.md)
MALFORMED_REQUEST = "code = 1024.*message = malformed request"
OBJECT_ACCESS_DENIED = "code = 2048.*message = access to object operation denied" OBJECT_ACCESS_DENIED = "code = 2048.*message = access to object operation denied"
OBJECT_NOT_FOUND = "code = 2049.*message = object not found" OBJECT_NOT_FOUND = "code = 2049.*message = object not found"
OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed" OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"

View file

@ -0,0 +1,27 @@
import logging
from dataclasses import dataclass
from time import sleep, time
import allure
import pytest
from common import NEOFS_NETMAP, STORAGE_NODE_SERVICE_NAME_REGEX
from epoch import tick_epoch
from grpc_responses import OBJECT_ALREADY_REMOVED
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import delete_object, get_object, head_object
from tombstone import verify_head_tombstone
logger = logging.getLogger("NeoLogger")
@dataclass
class StorageObjectInfo:
size: str = None
cid: str = None
wallet: str = None
file_path: str = None
file_hash: str = None
attributes: list[dict[str, str]] = None
oid: str = None
tombstone: str = None

View file

@ -0,0 +1,59 @@
import os
import uuid
from dataclasses import dataclass
from typing import Optional
from common import FREE_STORAGE, WALLET_PASS
from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet, init_wallet
from python_keywords.payment_neogo import deposit_gas, transfer_gas
@dataclass
class WalletFile:
path: str
password: str
containers: Optional[list[str]] = None
def get_address(self) -> str:
"""
Extracts the last address from wallet.
Returns:
The address of the wallet.
"""
return get_last_address_from_wallet(self.path, self.password)
class WalletFactory:
def __init__(self, wallets_dir: str, shell: Shell) -> None:
self.shell = shell
self.wallets_dir = wallets_dir
def create_wallet(self, password: str = WALLET_PASS) -> WalletFile:
"""
Creates new default wallet
Args:
password: wallet password
Returns:
WalletFile object of new wallet
"""
wallet_path = os.path.join(self.wallets_dir, f"{str(uuid.uuid4())}.json")
init_wallet(wallet_path, password)
if not FREE_STORAGE:
deposit = 30
transfer_gas(
shell=self.shell,
amount=deposit + 1,
wallet_to_path=wallet_path,
wallet_to_password=password,
)
deposit_gas(
shell=self.shell,
amount=deposit,
wallet_from_path=wallet_path,
wallet_from_password=password,
)
return WalletFile(wallet_path, password)

View file

@ -25,6 +25,7 @@ markers =
long: long tests (with long execution time) long: long tests (with long execution time)
node_mgmt: neofs control commands node_mgmt: neofs control commands
session_token: tests for operations with session token session_token: tests for operations with session token
static_session: tests for operations with static session token
acl: All tests for ACL acl: All tests for ACL
acl_basic: tests for basic ACL acl_basic: tests for basic ACL
acl_bearer: tests for ACL with bearer acl_bearer: tests for ACL with bearer

View file

@ -3,16 +3,45 @@ import json
import logging import logging
import os import os
import uuid import uuid
from dataclasses import dataclass
from typing import Optional
import allure import allure
import json_transformers import json_transformers
from common import ASSETS_DIR, NEOFS_CLI_EXEC, NEOFS_ENDPOINT, WALLET_CONFIG from common import ASSETS_DIR, NEOFS_CLI_EXEC, NEOFS_ENDPOINT, WALLET_CONFIG
from data_formatters import get_wallet_public_key
from json_transformers import encode_for_json
from neo3 import wallet from neo3 import wallet
from neofs_testlib.cli import NeofsCli from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell from neofs_testlib.shell import Shell
from storage_object_info import StorageObjectInfo
from wallet import WalletFile
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
PUT_VERB = "PUT"
DELETE_VERB = "DELETE"
LOCK_VERB = "LOCK"
GET_VERB = "GET"
RANGEHASH_VERB = "RANGEHASH"
RANGE_VERB = "RANGE"
HEAD_VERB = "HEAD"
SEARCH_VERB = "SEARCH"
UNRELATED_KEY = "unrelated key in the session"
UNRELATED_OBJECT = "unrelated object in the session"
UNRELATED_CONTAINER = "unrelated container in the session"
WRONG_VERB = "wrong verb of the session"
INVALID_SIGNATURE = "invalid signature of the session data"
@dataclass
class Lifetime:
exp: int = 100000000
nbf: int = 0
iat: int = 0
@allure.step("Generate Session Token") @allure.step("Generate Session Token")
def generate_session_token(owner: str, session_wallet: str, cid: str = "") -> str: def generate_session_token(owner: str, session_wallet: str, cid: str = "") -> str:
@ -50,15 +79,7 @@ def generate_session_token(owner: str, session_wallet: str, cid: str = "") -> st
"container": { "container": {
"verb": "PUT", "verb": "PUT",
"wildcard": cid != "", "wildcard": cid != "",
**( **({"containerID": {"value": f"{encode_for_json(cid)}"}} if cid != "" else {}),
{
"containerID": {
"value": f"{base64.b64encode(cid.encode('utf-8')).decode('utf-8')}"
}
}
if cid != ""
else {}
),
}, },
} }
} }
@ -70,6 +91,95 @@ def generate_session_token(owner: str, session_wallet: str, cid: str = "") -> st
return file_path return file_path
@allure.step("Generate Session Token For Object")
def generate_object_session_token(
owner_wallet: WalletFile,
session_wallet: WalletFile,
oids: list[str],
cid: str,
verb: str,
tokens_dir: str,
lifetime: Optional[Lifetime] = None,
) -> str:
"""
This function generates session token for ObjectSessionContext
and writes it to the file. It is able to prepare session token file
for a specific container (<cid>) or for every container (adds
"wildcard" field).
Args:
owner_wallet: wallet of container owner
session_wallet: wallet to which we grant the
access via session token
cid: container ID of the container
oids: list of objectIDs to put into session
verb: verb to grant access to;
Valid verbs are: GET, RANGE, RANGEHASH, HEAD, SEARCH.
lifetime: lifetime options for session
Returns:
The path to the generated session token file
"""
file_path = os.path.join(tokens_dir, str(uuid.uuid4()))
pub_key_64 = get_wallet_public_key(session_wallet.path, session_wallet.password, "base64")
lifetime = lifetime if lifetime else Lifetime()
session_token = {
"body": {
"id": f"{base64.b64encode(uuid.uuid4().bytes).decode('utf-8')}",
"ownerID": {
"value": f"{json_transformers.encode_for_json(owner_wallet.get_address())}"
},
"lifetime": {
"exp": f"{lifetime.exp}",
"nbf": f"{lifetime.nbf}",
"iat": f"{lifetime.iat}",
},
"sessionKey": pub_key_64,
"object": {
"verb": verb,
"target": {
"container": {"value": encode_for_json(cid)},
"objects": [{"value": encode_for_json(oid)} for oid in oids],
},
},
}
}
logger.info(f"Got this Session Token: {session_token}")
with open(file_path, "w", encoding="utf-8") as session_token_file:
json.dump(session_token, session_token_file, ensure_ascii=False, indent=4)
return file_path
@allure.step("Get signed token for object session")
def get_object_signed_token(
owner_wallet: WalletFile,
user_wallet: WalletFile,
storage_objects: list[StorageObjectInfo],
verb: str,
shell: Shell,
tokens_dir: str,
lifetime: Optional[Lifetime] = None,
) -> str:
"""
Returns signed token file path for static object session
"""
storage_object_ids = [storage_object.oid for storage_object in storage_objects]
session_token_file = generate_object_session_token(
owner_wallet,
user_wallet,
storage_object_ids,
owner_wallet.containers[0],
verb,
tokens_dir,
lifetime=lifetime,
)
return sign_session_token(shell, session_token_file, owner_wallet.path)
@allure.step("Create Session Token") @allure.step("Create Session Token")
def create_session_token( def create_session_token(
shell: Shell, shell: Shell,

View file

@ -0,0 +1,85 @@
import logging
from time import sleep, time
import allure
import pytest
from common import STORAGE_NODE_SERVICE_NAME_REGEX
from epoch import tick_epoch
from grpc_responses import OBJECT_ALREADY_REMOVED
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import delete_object, get_object, head_object
from storage_object_info import StorageObjectInfo
from tombstone import verify_head_tombstone
logger = logging.getLogger("NeoLogger")
CLEANUP_TIMEOUT = 10
@allure.step("Waiting until object will be available on all nodes")
def wait_until_objects_available_on_all_nodes(
hosting: Hosting,
storage_objects: list[StorageObjectInfo],
shell: Shell,
max_wait_time: int = 60,
) -> None:
start = time()
def wait_for_objects():
for service_config in hosting.find_service_configs(STORAGE_NODE_SERVICE_NAME_REGEX):
endpoint = service_config.attributes["rpc_endpoint"]
for storage_object in storage_objects:
head_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
shell,
endpoint=endpoint,
)
while start + max_wait_time >= time():
try:
wait_for_objects()
return
except Exception as ex:
logger.debug(ex)
sleep(1)
raise ex
def delete_objects(storage_objects: list[StorageObjectInfo], shell: Shell) -> None:
"""
Deletes given storage objects.
Args:
storage_objects: list of objects to delete
shell: executor for cli command
"""
with allure.step("Delete objects"):
for storage_object in storage_objects:
storage_object.tombstone = delete_object(
storage_object.wallet, storage_object.cid, storage_object.oid, shell
)
verify_head_tombstone(
wallet_path=storage_object.wallet,
cid=storage_object.cid,
oid_ts=storage_object.tombstone,
oid=storage_object.oid,
shell=shell,
)
tick_epoch(shell=shell)
sleep(CLEANUP_TIMEOUT)
with allure.step("Get objects and check errors"):
for storage_object in storage_objects:
with pytest.raises(Exception, match=OBJECT_ALREADY_REMOVED):
get_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
shell=shell,
)

View file

@ -16,6 +16,7 @@ from neofs_testlib.shell import LocalShell, Shell
from neofs_testlib.utils.wallet import init_wallet from neofs_testlib.utils.wallet import init_wallet
from payment_neogo import deposit_gas, transfer_gas from payment_neogo import deposit_gas, transfer_gas
from python_keywords.node_management import node_healthcheck from python_keywords.node_management import node_healthcheck
from wallet import WalletFactory
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -61,6 +62,11 @@ def require_multiple_hosts(hosting: Hosting):
yield yield
@pytest.fixture(scope="session")
def wallet_factory(prepare_tmp_dir: str, client_shell: Shell) -> WalletFactory:
return WalletFactory(prepare_tmp_dir, client_shell)
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@allure.title("Check binary versions") @allure.title("Check binary versions")
def check_binary_versions(request, hosting: Hosting, client_shell: Shell): def check_binary_versions(request, hosting: Hosting, client_shell: Shell):
@ -74,10 +80,14 @@ def check_binary_versions(request, hosting: Hosting, client_shell: Shell):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@allure.title("Prepare tmp directory") @allure.title("Prepare tmp directory")
def prepare_tmp_dir(): def prepare_tmp_dir():
with allure.step("Prepare tmp directory"):
full_path = os.path.join(os.getcwd(), ASSETS_DIR) full_path = os.path.join(os.getcwd(), ASSETS_DIR)
shutil.rmtree(full_path, ignore_errors=True) shutil.rmtree(full_path, ignore_errors=True)
os.mkdir(full_path) os.mkdir(full_path)
yield full_path yield full_path
with allure.step("Remove tmp directory"):
shutil.rmtree(full_path) shutil.rmtree(full_path)

View file

@ -1,20 +1,16 @@
import logging import logging
import random import random
import sys import sys
from dataclasses import dataclass
from time import sleep
import allure import allure
import pytest import pytest
from common import COMPLEX_OBJ_SIZE, SIMPLE_OBJ_SIZE from common import COMPLEX_OBJ_SIZE, SIMPLE_OBJ_SIZE
from container import create_container from container import create_container
from epoch import tick_epoch
from file_helper import generate_file, get_file_content, get_file_hash from file_helper import generate_file, get_file_content, get_file_hash
from grpc_responses import OBJECT_ALREADY_REMOVED, OUT_OF_RANGE, error_matches_status from grpc_responses import OUT_OF_RANGE
from neofs_testlib.shell import Shell from neofs_testlib.shell import Shell
from pytest import FixtureRequest from pytest import FixtureRequest
from python_keywords.neofs_verbs import ( from python_keywords.neofs_verbs import (
delete_object,
get_netmap_netinfo, get_netmap_netinfo,
get_object, get_object,
get_range, get_range,
@ -24,7 +20,9 @@ from python_keywords.neofs_verbs import (
search_object, search_object,
) )
from python_keywords.storage_policy import get_complex_object_copies, get_simple_object_copies from python_keywords.storage_policy import get_complex_object_copies, get_simple_object_copies
from tombstone import verify_head_tombstone
from helpers.storage_object_info import StorageObjectInfo
from steps.storage_object import delete_objects
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -48,18 +46,6 @@ STATIC_RANGES = {
} }
@dataclass
class StorageObjectInfo:
size: str = None
cid: str = None
wallet: str = None
file_path: str = None
file_hash: str = None
attributes: list[dict[str, str]] = None
oid: str = None
tombstone: str = None
def generate_ranges(file_size: int, max_object_size: int) -> list[(int, int)]: def generate_ranges(file_size: int, max_object_size: int) -> list[(int, int)]:
file_range_step = file_size / RANGES_COUNT file_range_step = file_size / RANGES_COUNT
@ -94,39 +80,11 @@ def generate_ranges(file_size: int, max_object_size: int) -> list[(int, int)]:
return file_ranges_to_test return file_ranges_to_test
def delete_objects(storage_objects: list, client_shell: Shell) -> None:
with allure.step("Delete objects"):
for storage_object in storage_objects:
storage_object.tombstone = delete_object(
storage_object.wallet, storage_object.cid, storage_object.oid, client_shell
)
verify_head_tombstone(
wallet_path=storage_object.wallet,
cid=storage_object.cid,
oid_ts=storage_object.tombstone,
oid=storage_object.oid,
shell=client_shell,
)
tick_epoch(shell=client_shell)
sleep(CLEANUP_TIMEOUT)
with allure.step("Get objects and check errors"):
for storage_object in storage_objects:
get_object_and_check_error(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
error_pattern=OBJECT_ALREADY_REMOVED,
shell=client_shell,
)
@pytest.fixture( @pytest.fixture(
params=[SIMPLE_OBJ_SIZE, COMPLEX_OBJ_SIZE], params=[SIMPLE_OBJ_SIZE, COMPLEX_OBJ_SIZE],
ids=["simple object", "complex object"], ids=["simple object", "complex object"],
# Scope session to upload/delete each files set only once # Scope session to upload/delete each files set only once
scope="session", scope="module",
) )
def storage_objects( def storage_objects(
prepare_wallet_and_deposit: str, client_shell: Shell, request: FixtureRequest prepare_wallet_and_deposit: str, client_shell: Shell, request: FixtureRequest
@ -500,17 +458,6 @@ def test_object_get_range_hash_negatives(
get_range_hash(wallet, cid, oid, shell=client_shell, range_cut=range_cut) get_range_hash(wallet, cid, oid, shell=client_shell, range_cut=range_cut)
def get_object_and_check_error(
wallet: str, cid: str, oid: str, error_pattern: str, shell: Shell
) -> None:
try:
get_object(wallet=wallet, cid=cid, oid=oid, shell=shell)
raise AssertionError(f"Expected object {oid} removed, but it is not")
except Exception as err:
logger.info(f"Error is {err}")
assert error_matches_status(err, error_pattern), f"Expected {err} to match {error_pattern}"
def check_header_is_presented(head_info: dict, object_header: dict) -> None: def check_header_is_presented(head_info: dict, object_header: dict) -> None:
for key_to_check, val_to_check in object_header.items(): for key_to_check, val_to_check in object_header.items():
assert ( assert (

View file

@ -9,7 +9,8 @@ from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet from neofs_testlib.utils.wallet import get_last_address_from_wallet
from python_keywords.container import create_container from python_keywords.container import create_container
from python_keywords.neofs_verbs import delete_object, put_object from python_keywords.neofs_verbs import delete_object, put_object
from python_keywords.session_token import create_session_token
from steps.session_token import create_session_token
@allure.title("Test Object Operations with Session Token") @allure.title("Test Object Operations with Session Token")

View file

@ -0,0 +1,689 @@
import logging
import allure
import pytest
from common import COMPLEX_OBJ_SIZE, SIMPLE_OBJ_SIZE
from epoch import get_epoch, tick_epoch
from file_helper import generate_file
from grpc_responses import MALFORMED_REQUEST, OBJECT_ACCESS_DENIED, OBJECT_NOT_FOUND
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from pytest import FixtureRequest
from python_keywords.container import create_container
from python_keywords.neofs_verbs import (
delete_object,
get_netmap_netinfo,
get_object,
get_range,
get_range_hash,
head_object,
put_object,
search_object,
)
from wallet import WalletFactory, WalletFile
from helpers.storage_object_info import StorageObjectInfo
from steps.session_token import (
DELETE_VERB,
GET_VERB,
HEAD_VERB,
INVALID_SIGNATURE,
PUT_VERB,
RANGE_VERB,
RANGEHASH_VERB,
SEARCH_VERB,
UNRELATED_CONTAINER,
UNRELATED_KEY,
UNRELATED_OBJECT,
WRONG_VERB,
Lifetime,
generate_object_session_token,
get_object_signed_token,
sign_session_token,
)
from steps.storage_object import delete_objects, wait_until_objects_available_on_all_nodes
logger = logging.getLogger("NeoLogger")
RANGE_OFFSET_FOR_COMPLEX_OBJECT = 200
@allure.step("Ensure fresh epoch")
def ensure_fresh_epoch(shell: Shell) -> int:
# ensure new fresh epoch to avoid epoch switch during test session
current_epoch = get_epoch(shell)
tick_epoch(shell)
epoch = get_epoch(shell)
assert epoch > current_epoch, "Epoch wasn't ticked"
return epoch
@pytest.fixture(
params=[SIMPLE_OBJ_SIZE, COMPLEX_OBJ_SIZE],
ids=["simple object", "complex object"],
# Scope session to upload/delete each files set only once
scope="module",
)
def storage_objects(
hosting: Hosting, owner_wallet: WalletFile, client_shell: Shell, request: FixtureRequest
) -> list[StorageObjectInfo]:
file_path = generate_file(request.param)
storage_objects = []
# Separate containers for complex/simple objects to avoid side-effects
cid = create_container(owner_wallet.path, shell=client_shell)
other_cid = create_container(owner_wallet.path, shell=client_shell)
owner_wallet.containers = [cid, other_cid]
with allure.step("Put objects"):
# upload couple objects
for i in range(3):
storage_object = StorageObjectInfo()
storage_object.size = request.param
storage_object.cid = cid
storage_object.wallet = owner_wallet.path
storage_object.file_path = file_path
storage_object.oid = put_object(
wallet=owner_wallet.path,
path=file_path,
cid=cid,
shell=client_shell,
)
storage_objects.append(storage_object)
wait_until_objects_available_on_all_nodes(hosting, storage_objects, client_shell)
yield storage_objects
# Teardown after all tests done with current param
delete_objects(storage_objects, client_shell)
@allure.step("Get ranges for test")
def get_ranges(storage_object: StorageObjectInfo, shell: Shell) -> list[str]:
"""
Returns ranges to test range/hash methods via static session
"""
object_size = storage_object.size
if object_size == COMPLEX_OBJ_SIZE:
net_info = get_netmap_netinfo(storage_object.wallet, shell)
max_object_size = net_info["maximum_object_size"]
# make sure to test multiple parts of complex object
assert object_size >= max_object_size + RANGE_OFFSET_FOR_COMPLEX_OBJECT
return [
"0:10",
f"{object_size-10}:10",
f"{max_object_size - RANGE_OFFSET_FOR_COMPLEX_OBJECT}:{RANGE_OFFSET_FOR_COMPLEX_OBJECT * 2}",
]
else:
return ["0:10", f"{object_size-10}:10"]
@pytest.fixture(scope="module")
def owner_wallet(wallet_factory: WalletFactory) -> WalletFile:
"""
Returns wallet which owns containers and objects
"""
return wallet_factory.create_wallet()
@pytest.fixture(scope="module")
def user_wallet(wallet_factory: WalletFactory) -> WalletFile:
"""
Returns wallet which will use objects from owner via static session
"""
return wallet_factory.create_wallet()
@pytest.fixture(scope="module")
def stranger_wallet(wallet_factory: WalletFactory) -> WalletFile:
"""
Returns stranger wallet which should fail to obtain data
"""
return wallet_factory.create_wallet()
@pytest.fixture(scope="module")
def static_sessions(
owner_wallet: WalletFile,
user_wallet: WalletFile,
storage_objects: list[StorageObjectInfo],
client_shell: Shell,
prepare_tmp_dir: str,
) -> dict[str, str]:
"""
Returns dict with static session token file paths for all verbs with default lifetime with valid container and first two objects
"""
verbs = [GET_VERB, RANGEHASH_VERB, RANGE_VERB, HEAD_VERB, SEARCH_VERB, DELETE_VERB, PUT_VERB]
sessions = {}
for verb in verbs:
sessions[verb] = get_object_signed_token(
owner_wallet, user_wallet, storage_objects[0:2], verb, client_shell, prepare_tmp_dir
)
return sessions
@allure.title("Validate static session with read operations")
@pytest.mark.static_session
@pytest.mark.parametrize(
"method_under_test,verb",
[
(head_object, HEAD_VERB),
(get_object, GET_VERB),
],
)
def test_static_session_read(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
method_under_test,
verb: str,
request: FixtureRequest,
):
"""
Validate static session with read operations
"""
allure.dynamic.title(
f"Validate static session with read operations for {request.node.callspec.id}"
)
for storage_object in storage_objects[0:2]:
method_under_test(
user_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=static_sessions[verb],
)
@allure.title("Validate static session with range operations")
@pytest.mark.static_session
@pytest.mark.parametrize(
"method_under_test,verb",
[
(get_range, RANGE_VERB),
# (get_range_hash, RANGEHASH_VERB), session is absent in neofs-cli object hash command (see https://github.com/nspcc-dev/neofs-node/issues/2029)
],
)
def test_static_session_range(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
method_under_test,
verb: str,
request: FixtureRequest,
):
"""
Validate static session with range operations
"""
allure.dynamic.title(
f"Validate static session with range operations for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
ranges_to_test = get_ranges(storage_object, client_shell)
for range_to_test in ranges_to_test:
with allure.step(f"Check range {range_to_test}"):
method_under_test(
user_wallet.path,
storage_object.cid,
storage_object.oid,
shell=client_shell,
session=static_sessions[verb],
range_cut=range_to_test,
)
@allure.title("Validate static session with search operation")
@pytest.mark.static_session
@pytest.mark.xfail
# (see https://github.com/nspcc-dev/neofs-node/issues/2030)
def test_static_session_search(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Validate static session with search operations
"""
allure.dynamic.title(f"Validate static session with search for {request.node.callspec.id}")
cid = storage_objects[0].cid
expected_object_ids = [storage_object.oid for storage_object in storage_objects[0:2]]
actual_object_ids = search_object(
user_wallet.path, cid, client_shell, session=static_sessions[SEARCH_VERB], root=True
)
assert expected_object_ids == actual_object_ids
@allure.title("Validate static session with object id not in session")
@pytest.mark.static_session
def test_static_session_unrelated_object(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Validate static session with object id not in session
"""
allure.dynamic.title(
f"Validate static session with object id not in session for {request.node.callspec.id}"
)
with pytest.raises(Exception, match=UNRELATED_OBJECT):
head_object(
user_wallet.path,
storage_objects[2].cid,
storage_objects[2].oid,
client_shell,
session=static_sessions[HEAD_VERB],
)
@allure.title("Validate static session with user id not in session")
@pytest.mark.static_session
def test_static_session_head_unrelated_user(
stranger_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Validate static session with user id not in session
"""
allure.dynamic.title(
f"Validate static session with user id not in session for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
with pytest.raises(Exception, match=UNRELATED_KEY):
head_object(
stranger_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=static_sessions[HEAD_VERB],
)
@allure.title("Validate static session with wrong verb in session")
@pytest.mark.static_session
def test_static_session_head_wrong_verb(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Validate static session with wrong verb in session
"""
allure.dynamic.title(
f"Validate static session with wrong verb in session for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
with pytest.raises(Exception, match=WRONG_VERB):
get_object(
user_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=static_sessions[HEAD_VERB],
)
@allure.title("Validate static session with container id not in session")
@pytest.mark.static_session
def test_static_session_unrelated_container(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Validate static session with container id not in session
"""
allure.dynamic.title(
f"Validate static session with container id not in session for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
with pytest.raises(Exception, match=UNRELATED_CONTAINER):
get_object(
user_wallet.path,
owner_wallet.containers[1],
storage_object.oid,
client_shell,
session=static_sessions[GET_VERB],
)
@allure.title("Validate static session which signed by another wallet")
@pytest.mark.static_session
def test_static_session_signed_by_other(
owner_wallet: WalletFile,
user_wallet: WalletFile,
stranger_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which signed by another wallet
"""
allure.dynamic.title(
f"Validate static session which signed by another wallet for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
session_token_file = generate_object_session_token(
owner_wallet,
user_wallet,
[storage_object.oid],
owner_wallet.containers[0],
HEAD_VERB,
prepare_tmp_dir,
)
signed_token_file = sign_session_token(client_shell, session_token_file, stranger_wallet.path)
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
head_object(
user_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=signed_token_file,
)
@allure.title("Validate static session which signed for another container")
@pytest.mark.static_session
def test_static_session_signed_for_other_container(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which signed for another container
"""
allure.dynamic.title(
f"Validate static session which signed for another container for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
container = owner_wallet.containers[1]
session_token_file = generate_object_session_token(
owner_wallet, user_wallet, [storage_object.oid], container, HEAD_VERB, prepare_tmp_dir
)
signed_token_file = sign_session_token(client_shell, session_token_file, owner_wallet.path)
with pytest.raises(Exception, match=OBJECT_NOT_FOUND):
head_object(
user_wallet.path, container, storage_object.oid, client_shell, session=signed_token_file
)
@allure.title("Validate static session which wasn't signed")
@pytest.mark.static_session
def test_static_session_without_sign(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which wasn't signed
"""
allure.dynamic.title(
f"Validate static session which wasn't signed for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
session_token_file = generate_object_session_token(
owner_wallet,
user_wallet,
[storage_object.oid],
owner_wallet.containers[0],
HEAD_VERB,
prepare_tmp_dir,
)
with pytest.raises(Exception, match=INVALID_SIGNATURE):
head_object(
user_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=session_token_file,
)
@allure.title("Validate static session which expires at next epoch")
@pytest.mark.static_session
def test_static_session_expiration_at_next(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session expires which at next epoch
"""
allure.dynamic.title(
f"Validate static session expires which at next epoch for {request.node.callspec.id}"
)
epoch = ensure_fresh_epoch(client_shell)
container = owner_wallet.containers[0]
object_id = storage_objects[0].oid
expiration = Lifetime(epoch + 1, epoch, epoch)
token_expire_at_next_epoch = get_object_signed_token(
owner_wallet,
user_wallet,
storage_objects,
HEAD_VERB,
client_shell,
prepare_tmp_dir,
expiration,
)
head_object(
user_wallet.path, container, object_id, client_shell, session=token_expire_at_next_epoch
)
tick_epoch(client_shell)
with pytest.raises(Exception, match=MALFORMED_REQUEST):
head_object(
user_wallet.path, container, object_id, client_shell, session=token_expire_at_next_epoch
)
@allure.title("Validate static session which is valid starting from next epoch")
@pytest.mark.static_session
def test_static_session_start_at_next(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which is valid starting from next epoch
"""
allure.dynamic.title(
f"Validate static session which is valid starting from next epoch for {request.node.callspec.id}"
)
epoch = ensure_fresh_epoch(client_shell)
container = owner_wallet.containers[0]
object_id = storage_objects[0].oid
expiration = Lifetime(epoch + 10, epoch + 1, epoch)
token_start_at_next_epoch = get_object_signed_token(
owner_wallet,
user_wallet,
storage_objects,
HEAD_VERB,
client_shell,
prepare_tmp_dir,
expiration,
)
with pytest.raises(Exception, match=MALFORMED_REQUEST):
head_object(
user_wallet.path, container, object_id, client_shell, session=token_start_at_next_epoch
)
tick_epoch(client_shell)
head_object(
user_wallet.path, container, object_id, client_shell, session=token_start_at_next_epoch
)
@allure.title("Validate static session which is already expired")
@pytest.mark.static_session
def test_static_session_already_expired(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which is already expired
"""
allure.dynamic.title(
f"Validate static session which is already expired for {request.node.callspec.id}"
)
epoch = ensure_fresh_epoch(client_shell)
container = owner_wallet.containers[0]
object_id = storage_objects[0].oid
expiration = Lifetime(epoch - 1, epoch - 2, epoch - 2)
token_already_expired = get_object_signed_token(
owner_wallet,
user_wallet,
storage_objects,
HEAD_VERB,
client_shell,
prepare_tmp_dir,
expiration,
)
with pytest.raises(Exception, match=MALFORMED_REQUEST):
head_object(
user_wallet.path, container, object_id, client_shell, session=token_already_expired
)
@allure.title("Delete verb should be restricted for static session")
def test_static_session_delete_verb(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Delete verb should be restricted for static session
"""
allure.dynamic.title(
f"Delete verb should be restricted for static session for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
delete_object(
user_wallet.path,
storage_object.cid,
storage_object.oid,
client_shell,
session=static_sessions[DELETE_VERB],
)
@allure.title("Put verb should be restricted for static session")
def test_static_session_put_verb(
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
static_sessions: list[str],
request: FixtureRequest,
):
"""
Put verb should be restricted for static session
"""
allure.dynamic.title(
f"Put verb should be restricted for static session for {request.node.callspec.id}"
)
storage_object = storage_objects[0]
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
put_object(
user_wallet.path,
storage_object.file_path,
storage_object.cid,
client_shell,
session=static_sessions[PUT_VERB],
)
@allure.title("Validate static session which is issued in future epoch")
@pytest.mark.static_session
def test_static_session_invalid_issued_epoch(
owner_wallet: WalletFile,
user_wallet: WalletFile,
client_shell: Shell,
storage_objects: list[StorageObjectInfo],
prepare_tmp_dir: str,
request: FixtureRequest,
):
"""
Validate static session which is issued in future epoch
"""
allure.dynamic.title(
f"Validate static session which is issued in future epoch for {request.node.callspec.id}"
)
epoch = ensure_fresh_epoch(client_shell)
container = owner_wallet.containers[0]
object_id = storage_objects[0].oid
expiration = Lifetime(epoch + 10, 0, epoch + 1)
token_invalid_issue_time = get_object_signed_token(
owner_wallet,
user_wallet,
storage_objects,
HEAD_VERB,
client_shell,
prepare_tmp_dir,
expiration,
)
with pytest.raises(Exception, match=MALFORMED_REQUEST):
head_object(
user_wallet.path, container, object_id, client_shell, session=token_invalid_issue_time
)