import json import time import allure import pytest import yaml from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.clients import AwsCliClient, S3ClientWrapper from frostfs_testlib.clients.s3 import BucketContainerResolver, VersioningStatus from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, HOSTING_CONFIG_FILE, MORPH_BLOCK_TIME from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.file_utils import TestFile, generate_file, get_file_hash from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container from ...resources.common import S3_POLICY_FILE_LOCATION def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: if "ec_policy" not in metafunc.fixturenames: return with open(HOSTING_CONFIG_FILE, "r") as file: hosting_config = yaml.full_load(file) node_count = len(hosting_config["hosts"]) ec_map = { 4: ["EC 1.1", "EC 2.1", "EC 3.1", "EC 2.2"], 8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4", "EC 3.1"], 16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], 100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], } nearest_node_count = ([4] + (list(filter(lambda x: x <= node_count, ec_map.keys()))))[-1] metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[nearest_node_count])) @allure.title("Initialized remote FrostfsAdm") @pytest.fixture def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm: node = cluster.cluster_nodes[0] shell = node.host.get_shell() return FrostfsAdm(shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) @pytest.mark.nightly @pytest.mark.replication @pytest.mark.ec_replication class TestECReplication(ClusterTestBase): def get_node_cli(self, cluster_node: ClusterNode, config: str) -> FrostfsCli: shell = cluster_node.host.get_shell() cli = FrostfsCli(shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=config) self.cli_change_shards_mode: dict[FrostfsCli, str] = {cli: cluster_node.storage_node.get_control_endpoint()} return cli @pytest.fixture() def restore_nodes_shards_mode(self): yield for cli, endpoint in self.cli_change_shards_mode.items(): cli.shards.set_mode(endpoint, mode="read-write", all=True) time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture() def rep_count(self, object_size: ObjectSize) -> int: rep_count = 3 if object_size.name == "complex": rep_count *= int(COMPLEX_OBJECT_CHUNKS_COUNT) + 1 if COMPLEX_OBJECT_TAIL_SIZE else int(COMPLEX_OBJECT_CHUNKS_COUNT) return rep_count @wait_for_success(120, 5) def wait_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str, success: bool = True) -> None: if not success: assert not self.check_replication(total_chunks, client, cid, oid) else: assert self.check_replication(total_chunks, client, cid, oid) @allure.title("Restore chunk maximum params in network params ") @pytest.fixture def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None: yield frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=12" "MaxECParityCount=5"') @reporter.step("Get object nodes output ") def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict: if not endpoint: endpoint = self.cluster.default_rpc_endpoint object_nodes = json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT).stdout) if object_nodes.get("errors"): raise object_nodes["errors"] return object_nodes @reporter.step("Get parity chunk ") def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] return Chunk(**chunks[-1]) @reporter.step("Get data chunk ") def get_data_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] return Chunk(**chunks[0]) @reporter.step("Check replication chunks={total_chunks} chunks ") def check_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str) -> bool: object_nodes_info = client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) return len(object_nodes_info) == total_chunks @pytest.fixture() def include_excluded_nodes(self, cluster_state_controller: ClusterStateController): yield cluster_state_controller.include_all_excluded_nodes() @allure.title("Disable Policer on all nodes") @pytest.fixture() def disable_policer(self, cluster_state_controller: ClusterStateController) -> None: with reporter.step("Disable policer for nodes"): cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes( service_type=StorageNode, values={"policer": {"unsafe_disable": True}} ) yield with reporter.step("Enable policer for nodes"): cluster_state_controller.start_stopped_hosts() cluster_state_controller.manager(ConfigStateManager).revert_all() @wait_for_success(300, 15) @reporter.step("Check count nodes chunks") def wait_sync_count_chunks_nodes(self, grpc_client: GrpcClientWrapper, cid: str, oid: str, count: int): all_chunks_after_include_node = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) chunks_nodes = [node for chunk in all_chunks_after_include_node for node in chunk.confirmed_nodes] assert len(chunks_nodes) == count @allure.title("Create container with EC policy (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_create_container_with_ec_policy( self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, test_file: TestFile ) -> None: with reporter.step("Put object in container."): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): assert self.check_replication(rep_count, remote_grpc_client, container, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_node_with_data_chunk( self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, disable_policer: None, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node data chunk"): chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0] grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node[0]) self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_node_with_parity_chunk( self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, disable_policer: None, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node with parity chunk"): chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0] with reporter.step("Stop node parity chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") with reporter.step("Get object, expect success."): node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_lose_nodes_data_chunk_and_parity( self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, disable_policer: None, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node data chunk and node parity chunk"): data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) data_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) parity_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(data_chunk_node, "hard") with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {data_chunk_node, parity_chunk_node})[0] grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped host and check chunks."): cluster_state_controller.start_node_host(data_chunk_node) self.wait_replication(4, remote_grpc_client, container, oid) with reporter.step("Stop node with parity chunk and one all node."): cluster_state_controller.stop_node_host(data_chunk_node, "hard") cluster_state_controller.stop_node_host(parity_chunk_node, "hard") with reporter.step("Get object, expect error."): with pytest.raises(RuntimeError): grpc_client.object.get(container, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Policer work with chunk") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_work_policer_with_nodes( self, simple_object_size: ObjectSize, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, container: str, include_excluded_nodes: None, ) -> None: with reporter.step("Put object on container."): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): assert self.check_replication(3, remote_grpc_client, container, oid) with reporter.step("Search node with chunk."): data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] first_all_chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Remove chunk node from network map"): cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node]) with reporter.step("Tick epoch."): alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0] self.tick_epoch(alive_node.storage_node, 2) with reporter.step("Wait replication chunk with different node."): node = grpc_client.object.chunks.search_node_without_chunks( first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint() )[0] self.wait_replication(3, remote_grpc_client, container, oid) with reporter.step("Get new chunks"): second_all_chunks = remote_grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid) with reporter.step("Check that oid no change."): assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] with reporter.step("Include node in netmap"): cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node) self.wait_sync_count_chunks_nodes(remote_grpc_client, container, oid, 3) @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) if "complex" in object_size.name: expected_chunks *= 4 cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=ec_policy, await_mode=True) with reporter.step("Apply Ape rule for container"): frostfs_cli.ape_manager.add( self.cluster.default_rpc_endpoint, chain_id="allowAll", target_name=cid, target_type="container", rule="allow Object.* *" ) with reporter.step("Wait for one block"): self.wait_for_blocks() with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count object chunks."): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) assert len(chunks) == expected_chunks with reporter.step("get object and check hash."): file_with_node = grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint) assert get_file_hash(test_file) == get_file_hash(file_with_node) @allure.title("Request PUT with copies_number flag") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_put_object_with_copies_number( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize ) -> None: with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 3.1")) def test_put_object_with_off_cnr_node( self, container: str, grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, simple_object_size: ObjectSize, ) -> None: with reporter.step("Stop one node in container nodes"): cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") with reporter.step("Put object in container, expect success for EC container."): test_file = generate_file(simple_object_size.value) grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1) @allure.title("Request PUT (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_put_object_with_ec_cnr( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get chunks object."): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Check header chunks object"): for chunk in chunks: chunk_head = grpc_client.object.head( container, chunk.object_id, self.cluster.default_rpc_endpoint, is_raw=True, json_output=False ).stdout assert "EC header:" in chunk_head @allure.title("Request GET (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) def test_get_object_in_ec_cnr( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) hash_origin_file = get_file_hash(test_file) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get id all chunks."): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Search chunk node and not chunks node."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0] not_chunk_node = grpc_client.object.chunks.search_node_without_chunks(chunks, self.cluster, self.cluster.default_rpc_endpoint)[ 0 ] with reporter.step("GET request with chunk node, expect success"): file_one = grpc_client.object.get(container, oid, chunk_node.storage_node.get_rpc_endpoint()) hash_file_one = get_file_hash(file_one) assert hash_file_one == hash_origin_file with reporter.step("Get request with not chunk node"): file_two = grpc_client.object.get(container, oid, not_chunk_node.storage_node.get_rpc_endpoint()) hash_file_two = get_file_hash(file_two) assert hash_file_two == hash_file_one == hash_origin_file @allure.title("Request SEARCH with flags 'root' (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_search_object_in_ec_cnr_root_flags(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation with --root flags"): search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, root=True) assert search_output[0] == oid @allure.title("Request SEARCH check valid chunk id (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_search_object_in_ec_cnr_chunk_id( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation object"): search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint) chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: assert chunk.object_id in search_output @allure.title("Request SEARCH check no chunk index info (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_search_object_in_ec_cnr( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation all chunk"): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id) assert "index" not in chunk_search @allure.title("Request DELETE (size={object_size})") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_delete_object_in_ec_cnr( self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks) == replication_count with reporter.step("Delete object"): grpc_client.object.delete(container, oid, self.cluster.default_rpc_endpoint) with reporter.step("Check that delete all chunks."): for chunk in chunks: with pytest.raises(RuntimeError, match="object already removed"): grpc_client.object.head(container, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Put second object."): oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): chunks_second_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_second_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Delete second object"): cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]}) grpc_client.object.delete(container, oid_second, cluster_nodes[0].storage_node.get_rpc_endpoint()) with reporter.step("Check that delete all chunk second object."): for chunk in chunks_second_object: with pytest.raises(RuntimeError, match="object already removed|object not found"): grpc_client.object.head(container, chunk.object_id, cluster_nodes[0].storage_node.get_rpc_endpoint()) @allure.title("Request LOCK (size={object_size})") @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_lock_object_in_ec_cnr( self, container: str, test_file: TestFile, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, frostfs_cli: FrostfsCli, object_size: ObjectSize, cluster_state_controller: ClusterStateController, include_excluded_nodes: None, ) -> None: with reporter.step("Put object in container."): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): chunks_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Put LOCK in object."): # TODO Rework for the grpc_client when the netmap methods are implemented epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip() grpc_client.object.lock(container, oid, self.cluster.default_rpc_endpoint, expire_at=(int(epoch) + 5)) with reporter.step("Check don`t delete chunk"): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed"): grpc_client.object.delete(container, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): grpc_client.object.delete(container, oid, self.cluster.default_rpc_endpoint) with reporter.step("Remove node in netmap."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_object[0])[0] alive_node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] cluster_state_controller.remove_node_from_netmap([chunk_node.storage_node]) with reporter.step("Check don`t delete chunk."): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed|object not found"): grpc_client.object.delete(container, chunk.object_id, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): grpc_client.object.delete(container, oid, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Include node in netmap"): cluster_state_controller.include_node_to_netmap(chunk_node.storage_node, alive_node.storage_node) @allure.title("Output MaxEC* params in frostf-scli (type={type_shards})") @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None: with reporter.step("Get and check params"): # TODO Rework for the grpc_client when the netmap methods are implemented net_info = frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout assert type_shards in net_info @allure.title("Change MaxEC*Count params") def test_change_max_data_shards_params( self, frostfs_remote_adm: FrostfsAdm, frostfs_cli: FrostfsCli, restore_network_config: None ) -> None: # TODO Rework for the grpc_client when the netmap methods are implemented with reporter.step("Get now params MaxECDataCount and MaxECParityCount"): node_netinfo = NetmapParser.netinfo( frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout ) with reporter.step("Change params"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"') with reporter.step("Get update params"): update_net_info = NetmapParser.netinfo( frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout ) with reporter.step("Check old and new params difference"): assert ( update_net_info.maximum_count_of_data_shards not in node_netinfo.maximum_count_of_data_shards and update_net_info.maximum_count_of_parity_shards not in node_netinfo.maximum_count_of_parity_shards ) @allure.title("Check maximum count data and parity shards") def test_change_over_max_parity_shards_params(self, frostfs_remote_adm: FrostfsAdm) -> None: with reporter.step("Change over maximum params shards count."): with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"') @allure.title("Create container with EC policy and SELECT (SELECT={select})") @pytest.mark.parametrize("select", [2, 4]) def test_create_container_with_select(self, select: int, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container"): policy = f"EC 1.1 CBF 1 SELECT {select} FROM *" cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True) with reporter.step("Check container nodes decomposed"): container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster) assert len(container_nodes) == select @allure.title("Create container with EC policy and CBF (CBF={cbf})") @pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)]) def test_create_container_with_cbf(self, cbf: int, expected_nodes: int, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container."): policy = f"EC 1.1 CBF {cbf}" cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True) with reporter.step("Check expected container nodes."): container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster) assert len(container_nodes) == expected_nodes @allure.title("Create container with EC policy and FILTER") @requires_container(PUBLIC_WITH_POLICY("EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU")) def test_create_container_with_filter( self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, container: str, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object is decomposed exclusively on Russian nodes"): data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) node_parity_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk) for node in [node_data_chunk[1], node_parity_chunk[1]]: assert "Russia" in node.country @allure.title("Evacuation shard with chunk (type={type})") @pytest.mark.parametrize("type, get_chunk", [("data", get_data_chunk_object), ("parity", get_parity_chunk_object)]) @requires_container(PUBLIC_WITH_POLICY("EC 1.1 CBF 1")) def test_evacuation_data_shard( self, container: str, restore_nodes_shards_mode: None, frostfs_cli: FrostfsCli, remote_frostfs_cli: FrostfsCli, grpc_client: GrpcClientWrapper, max_object_size: int, type: str, get_chunk, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(max_object_size - 1000) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get object chunks."): chunk = get_chunk(self, remote_frostfs_cli, container, oid, self.cluster.default_rpc_endpoint) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path()) with reporter.step("Search shards chunk"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) shard_id = grpc_client.object.chunks.get_shard_chunk(chunk_node[0], chunk) with reporter.step("Enable evacuation for shard"): frostfs_node_cli.shards.set_mode(chunk_node[0].storage_node.get_control_endpoint(), mode="read-only", id=shard_id) frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True) with reporter.step("Get object after evacuation shard"): grpc_client.object.get(container, oid, self.cluster.default_rpc_endpoint) @allure.title("[NEGATIVE] Don`t create more 1 EC policy") def test_more_one_ec_policy(self, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"): with pytest.raises(RuntimeError, match="can't parse placement policy"): grpc_client.container.create( self.cluster.default_rpc_endpoint, policy="EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *", await_mode=True ) @allure.title("Create bucket with EC policy (s3_client={s3_client})") @pytest.mark.parametrize("s3_policy, s3_client", [(S3_POLICY_FILE_LOCATION, AwsCliClient)], indirect=True) def test_create_bucket_with_ec_location( self, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, grpc_client: GrpcClientWrapper ) -> None: with reporter.step("Create bucket with EC location constrain"): bucket = s3_client.create_bucket(location_constraint="ec3.1") with reporter.step("Resolve container bucket"): cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) with reporter.step("Validate container policy"): container = grpc_client.container.get(self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT) assert container @allure.title("Bucket object count chunks (s3_client={s3_client}, size={object_size})") @pytest.mark.parametrize("s3_policy, s3_client", [(S3_POLICY_FILE_LOCATION, AwsCliClient)], indirect=True) def test_count_chunks_bucket_with_ec_location( self, test_file: TestFile, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize, ) -> None: with reporter.step("Create bucket with EC location constrain"): bucket = s3_client.create_bucket(location_constraint="ec3.1") with reporter.step("Enable versioning object"): s3_client.put_bucket_versioning(bucket, VersioningStatus.ENABLED) bucket_status = s3_client.get_bucket_versioning_status(bucket) assert bucket_status == VersioningStatus.ENABLED.value with reporter.step("Put object in bucket"): bucket_object = s3_client.put_object(bucket, test_file) with reporter.step("Watch replication count chunks"): cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object) expect_chunks = 4 if object_size.name == "simple" else 16 assert len(chunks) == expect_chunks @allure.title("Replication chunk after drop (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) def test_drop_chunk_and_replication( self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, rep_count: int ) -> None: with reporter.step("Put object"): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get all chunks"): data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) with reporter.step("Search chunk node"): chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) shell_chunk_node = chunk_node[0].host.get_shell() with reporter.step("Get replication count"): assert self.check_replication(rep_count, remote_grpc_client, container, oid) with reporter.step("Delete chunk"): frostfs_node_cli = FrostfsCli( shell_chunk_node, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(), ) frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}") with reporter.step("Wait replication count after drop one chunk"): self.wait_replication(rep_count, remote_grpc_client, container, oid)