import json
import time

import allure
import pytest
import yaml
from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, HOSTING_CONFIG_FILE, MORPH_BLOCK_TIME
from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper
from frostfs_testlib.s3.interfaces import BucketContainerResolver, VersioningStatus
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk
from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash


def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
    if "ec_policy" not in metafunc.fixturenames:
        return

    with open(HOSTING_CONFIG_FILE, "r") as file:
        hosting_config = yaml.full_load(file)

    node_count = len(hosting_config["hosts"])

    ec_map = {
        4: ["EC 1.1", "EC 2.1", "EC 3.1", "EC 2.2"],
        8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4", "EC 3.1"],
        16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
        100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
    }

    metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[node_count]))


@allure.title("Initialized remote FrostfsAdm")
@pytest.fixture
def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm:
    node = cluster.cluster_nodes[0]
    shell = node.host.get_shell()
    return FrostfsAdm(shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)


@pytest.mark.nightly
@pytest.mark.replication
@pytest.mark.ec_replication
class TestECReplication(ClusterTestBase):
    def get_node_cli(self, cluster_node: ClusterNode, config: str) -> FrostfsCli:
        shell = cluster_node.host.get_shell()
        cli = FrostfsCli(shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=config)
        self.cli_change_shards_mode: dict[FrostfsCli, str] = {cli: cluster_node.storage_node.get_control_endpoint()}
        return cli

    @pytest.fixture()
    def restore_nodes_shards_mode(self):
        yield

        for cli, endpoint in self.cli_change_shards_mode.items():
            cli.shards.set_mode(endpoint, mode="read-write", all=True)

        time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME))

    @pytest.fixture()
    def rep_count(self, object_size: ObjectSize) -> int:
        rep_count = 3
        if object_size.name == "complex":
            rep_count *= int(COMPLEX_OBJECT_CHUNKS_COUNT) + 1 if COMPLEX_OBJECT_TAIL_SIZE else int(COMPLEX_OBJECT_CHUNKS_COUNT)
        return rep_count

    @wait_for_success(120, 5)
    def wait_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str, success: bool = True) -> None:
        if not success:
            assert not self.check_replication(total_chunks, client, cid, oid)
        else:
            assert self.check_replication(total_chunks, client, cid, oid)

    @allure.title("Restore chunk maximum params in network params ")
    @pytest.fixture
    def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None:
        yield
        frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=12" "MaxECParityCount=5"')

    @reporter.step("Get object nodes output ")
    def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict:
        if not endpoint:
            endpoint = self.cluster.default_rpc_endpoint
        object_nodes = json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT).stdout)
        if object_nodes.get("errors"):
            raise object_nodes["errors"]
        return object_nodes

    @reporter.step("Get parity chunk ")
    def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk:
        chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"]
        return Chunk(**chunks[-1])

    @reporter.step("Get data chunk ")
    def get_data_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk:
        chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"]
        return Chunk(**chunks[0])

    @reporter.step("Check replication chunks={total_chunks} chunks ")
    def check_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str) -> bool:
        object_nodes_info = client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
        return len(object_nodes_info) == total_chunks

    @pytest.fixture()
    def include_excluded_nodes(self, cluster_state_controller: ClusterStateController):
        yield

        cluster_state_controller.include_all_excluded_nodes()

    @allure.title("Disable Policer on all nodes")
    @pytest.fixture()
    def disable_policer(self, cluster_state_controller: ClusterStateController) -> None:
        with reporter.step("Disable policer for nodes"):
            cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
                service_type=StorageNode, values={"policer": {"unsafe_disable": True}}
            )
        yield
        with reporter.step("Enable policer for nodes"):
            cluster_state_controller.start_stopped_hosts()
            cluster_state_controller.manager(ConfigStateManager).revert_all()

    @wait_for_success(300, 15)
    @reporter.step("Check count nodes chunks")
    def wait_sync_count_chunks_nodes(self, grpc_client: GrpcClientWrapper, cid: str, oid: str, count: int):
        all_chunks_after_include_node = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
        chunks_nodes = [node for chunk in all_chunks_after_include_node for node in chunk.confirmed_nodes]
        assert len(chunks_nodes) == count

    @allure.title("Create container with EC policy (size={object_size})")
    def test_create_container_with_ec_policy(self, object_size: ObjectSize, rep_count: int, grpc_client: GrpcClientWrapper) -> None:
        test_file = generate_file(object_size.value)

        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container."):
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check replication chunks."):
            assert self.check_replication(rep_count, grpc_client, cid, oid)

    @allure.title("Lose node with chunk data")
    @pytest.mark.failover
    def test_lose_node_with_data_chunk(
        self,
        grpc_client: GrpcClientWrapper,
        simple_object_size: ObjectSize,
        cluster_state_controller: ClusterStateController,
        disable_policer: None,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check chunk replication on 4 nodes."):
            assert self.check_replication(4, grpc_client, cid, oid)

        with reporter.step("Search node data chunk"):
            chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)

        with reporter.step("Stop node with data chunk."):
            cluster_state_controller.stop_node_host(chunk_node[0], "hard")

        with reporter.step("Get object"):
            node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0]
            grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint())

        with reporter.step("Start stopped node, and check replication chunks."):
            cluster_state_controller.start_node_host(chunk_node[0])
            self.wait_replication(4, grpc_client, cid, oid)

    @allure.title("Lose node with chunk parity")
    @pytest.mark.failover
    def test_lose_node_with_parity_chunk(
        self,
        grpc_client: GrpcClientWrapper,
        simple_object_size: ObjectSize,
        cluster_state_controller: ClusterStateController,
        disable_policer: None,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check chunk replication on 4 nodes."):
            assert self.check_replication(4, grpc_client, cid, oid)

        with reporter.step("Search node with parity chunk"):
            chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid)
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]

        with reporter.step("Stop node parity chunk."):
            cluster_state_controller.stop_node_host(chunk_node, "hard")

        with reporter.step("Get object, expect success."):
            node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0]
            grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint())

        with reporter.step("Start stoped node, and check replication chunks."):
            cluster_state_controller.start_node_host(chunk_node)
            self.wait_replication(4, grpc_client, cid, oid)

    @allure.title("Lose nodes with chunk data and parity")
    @pytest.mark.failover
    def test_lose_nodes_data_chunk_and_parity(
        self,
        grpc_client: GrpcClientWrapper,
        simple_object_size: ObjectSize,
        cluster_state_controller: ClusterStateController,
        disable_policer: None,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check count chunks, expect 4."):
            assert self.check_replication(4, grpc_client, cid, oid)

        with reporter.step("Search node data chunk and node parity chunk"):
            data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
            data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
            parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid)
            parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]

        with reporter.step("Stop node with data chunk."):
            cluster_state_controller.stop_node_host(data_chunk_node, "hard")

        with reporter.step("Get object"):
            node = list(set(self.cluster.cluster_nodes) - {data_chunk_node, parity_chunk_node})[0]
            grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint())

        with reporter.step("Start stopped host and check chunks."):
            cluster_state_controller.start_node_host(data_chunk_node)
            self.wait_replication(4, grpc_client, cid, oid)

        with reporter.step("Stop node with parity chunk and one all node."):
            cluster_state_controller.stop_node_host(data_chunk_node, "hard")
            cluster_state_controller.stop_node_host(parity_chunk_node, "hard")

        with reporter.step("Get object, expect error."):
            with pytest.raises(RuntimeError):
                grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint())

        with reporter.step("Start stopped nodes and check replication chunk."):
            cluster_state_controller.start_stopped_hosts()
            self.wait_replication(4, grpc_client, cid, oid)

    @allure.title("Policer work with chunk")
    @pytest.mark.failover
    def test_work_policer_with_nodes(
        self,
        simple_object_size: ObjectSize,
        grpc_client: GrpcClientWrapper,
        cluster_state_controller: ClusterStateController,
        include_excluded_nodes: None,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object on container."):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check count chunks nodes on 3."):
            assert self.check_replication(3, grpc_client, cid, oid)

        with reporter.step("Search node with chunk."):
            data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
            node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
            first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)

        with reporter.step("Remove chunk node from network map"):
            cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node])

        with reporter.step("Tick epoch."):
            alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0]
            self.tick_epoch(alive_node.storage_node, 2)

        with reporter.step("Wait replication chunk with different node."):
            node = grpc_client.object.chunks.search_node_without_chunks(
                first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint()
            )[0]
            self.wait_replication(3, grpc_client, cid, oid)

        with reporter.step("Get new chunks"):
            second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), cid, oid)

        with reporter.step("Check that oid no change."):
            assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]

        with reporter.step("Include node in netmap"):
            cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node)

        self.wait_sync_count_chunks_nodes(grpc_client, cid, oid, 3)

    @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})")
    def test_create_container_with_difference_count_nodes(
        self, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper
    ) -> None:
        with reporter.step("Create container."):
            expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
            if "complex" in object_size.name:
                expected_chunks *= 4
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=ec_policy, await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check count object chunks."):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            assert len(chunks) == expected_chunks

        with reporter.step("get object and check hash."):
            file_with_node = grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint)
            assert get_file_hash(test_file) == get_file_hash(file_with_node)

    @allure.title("Request PUT with copies_number flag")
    def test_put_object_with_copies_number(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container with copies number = 1"):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1)

        with reporter.step("Check that count chunks > 1."):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            assert len(chunks) > 1

    @allure.title("Request PUT and 1 node off")
    @pytest.mark.failover
    def test_put_object_with_off_cnr_node(
        self, grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, simple_object_size: ObjectSize
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True)

        with reporter.step("Stop one node in container nodes"):
            cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard")

        with reporter.step("Put object in container, expect success for EC container."):
            test_file = generate_file(simple_object_size.value)
            grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1)

    @allure.title("Request PUT (size={object_size})")
    def test_put_object_with_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container"):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Get chunks object."):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)

        with reporter.step("Check header chunks object"):
            for chunk in chunks:
                chunk_head = grpc_client.object.head(
                    cid, chunk.object_id, self.cluster.default_rpc_endpoint, is_raw=True, json_output=False
                ).stdout
                assert "EC header:" in chunk_head

    @allure.title("Request GET (size={object_size})")
    def test_get_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True)

        with reporter.step("Put object in container"):
            test_file = generate_file(object_size.value)
            hash_origin_file = get_file_hash(test_file)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Get id all chunks."):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)

        with reporter.step("Search chunk node and not chunks node."):
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0]
            not_chunk_node = grpc_client.object.chunks.search_node_without_chunks(chunks, self.cluster, self.cluster.default_rpc_endpoint)[
                0
            ]

        with reporter.step("GET request with chunk node, expect success"):
            file_one = grpc_client.object.get(cid, oid, chunk_node.storage_node.get_rpc_endpoint())
            hash_file_one = get_file_hash(file_one)
            assert hash_file_one == hash_origin_file

        with reporter.step("Get request with not chunk node"):
            file_two = grpc_client.object.get(cid, oid, not_chunk_node.storage_node.get_rpc_endpoint())
            hash_file_two = get_file_hash(file_two)
            assert hash_file_two == hash_file_one == hash_origin_file

    @allure.title("Request SEARCH with flags 'root' (size={object_size})")
    def test_search_object_in_ec_cnr_root_flags(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container"):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Search operation with --root flags"):
            search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, root=True)
            assert search_output[0] == oid

    @allure.title("Request SEARCH check valid chunk id (size={object_size})")
    def test_search_object_in_ec_cnr_chunk_id(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container"):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Search operation object"):
            search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint)
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            for chunk in chunks:
                assert chunk.object_id in search_output

    @allure.title("Request SEARCH check no chunk index info (size={object_size})")
    def test_search_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container"):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Search operation all chunk"):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            for chunk in chunks:
                chunk_search = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, oid=chunk.object_id)
                assert "index" not in chunk_search

    @allure.title("Request DELETE (size={object_size})")
    @pytest.mark.failover
    def test_delete_object_in_ec_cnr(
        self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check object chunks nodes."):
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            replication_count = 3 if object_size.name == "simple" else 3 * 4
            assert len(chunks) == replication_count

        with reporter.step("Delete object"):
            grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check that delete all chunks."):
            for chunk in chunks:
                with pytest.raises(RuntimeError, match="object already removed"):
                    grpc_client.object.head(cid, chunk.object_id, self.cluster.default_rpc_endpoint)

        with reporter.step("Put second object."):
            oid_second = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check second object chunks nodes."):
            chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid_second)
            assert len(chunks_second_object) == replication_count

        with reporter.step("Stop nodes with chunk."):
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_second_object[0])
            cluster_state_controller.stop_node_host(chunk_node[0], "hard")

        with reporter.step("Delete second object"):
            cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})
            grpc_client.object.delete(cid, oid_second, cluster_nodes[0].storage_node.get_rpc_endpoint())

        with reporter.step("Check that delete all chunk second object."):
            for chunk in chunks_second_object:
                with pytest.raises(RuntimeError, match="object already removed|object not found"):
                    grpc_client.object.head(cid, chunk.object_id, cluster_nodes[0].storage_node.get_rpc_endpoint())

    @allure.title("Request LOCK (size={object_size})")
    @pytest.mark.failover
    def test_lock_object_in_ec_cnr(
        self,
        grpc_client: GrpcClientWrapper,
        frostfs_cli: FrostfsCli,
        object_size: ObjectSize,
        cluster_state_controller: ClusterStateController,
        include_excluded_nodes: None,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check object chunks nodes."):
            chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
            replication_count = 3 if object_size.name == "simple" else 3 * 4
            assert len(chunks_object) == replication_count

        with reporter.step("Put LOCK in object."):
            # TODO Rework for the grpc_client when the netmap methods are implemented
            epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip()
            grpc_client.object.lock(cid, oid, self.cluster.default_rpc_endpoint, expire_at=(int(epoch) + 5))

        with reporter.step("Check don`t delete chunk"):
            for chunk in chunks_object:
                with pytest.raises(RuntimeError, match="Lock EC chunk failed"):
                    grpc_client.object.delete(cid, chunk.object_id, self.cluster.default_rpc_endpoint)

        with reporter.step("Check enable LOCK object"):
            with pytest.raises(RuntimeError, match="object is locked"):
                grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint)

        with reporter.step("Remove node in netmap."):
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_object[0])[0]
            alive_node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0]
            cluster_state_controller.remove_node_from_netmap([chunk_node.storage_node])

        with reporter.step("Check don`t delete chunk."):
            for chunk in chunks_object:
                with pytest.raises(RuntimeError, match="Lock EC chunk failed|object not found"):
                    grpc_client.object.delete(cid, chunk.object_id, alive_node.storage_node.get_rpc_endpoint())

        with reporter.step("Check enable LOCK object"):
            with pytest.raises(RuntimeError, match="object is locked"):
                grpc_client.object.delete(cid, oid, alive_node.storage_node.get_rpc_endpoint())

        with reporter.step("Include node in netmap"):
            cluster_state_controller.include_node_to_netmap(chunk_node.storage_node, alive_node.storage_node)

    @allure.title("Output MaxEC* params in frostf-scli (type={type_shards})")
    @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"])
    def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None:
        with reporter.step("Get and check params"):
            # TODO Rework for the grpc_client when the netmap methods are implemented
            net_info = frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout
            assert type_shards in net_info

    @allure.title("Change MaxEC*Count params")
    def test_change_max_data_shards_params(
        self, frostfs_remote_adm: FrostfsAdm, frostfs_cli: FrostfsCli, restore_network_config: None
    ) -> None:
        # TODO Rework for the grpc_client when the netmap methods are implemented
        with reporter.step("Get now params MaxECDataCount and MaxECParityCount"):
            node_netinfo = NetmapParser.netinfo(
                frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout
            )

        with reporter.step("Change params"):
            frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"')

        with reporter.step("Get update params"):
            update_net_info = NetmapParser.netinfo(
                frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout
            )

        with reporter.step("Check old and new params difference"):
            assert (
                update_net_info.maximum_count_of_data_shards not in node_netinfo.maximum_count_of_data_shards
                and update_net_info.maximum_count_of_parity_shards not in node_netinfo.maximum_count_of_parity_shards
            )

    @allure.title("Check maximum count data and parity shards")
    def test_change_over_max_parity_shards_params(self, frostfs_remote_adm: FrostfsAdm) -> None:
        with reporter.step("Change over maximum params shards count."):
            with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"):
                frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"')

    @allure.title("Create container with EC policy and SELECT (SELECT={select})")
    @pytest.mark.parametrize("select", [2, 4])
    def test_create_container_with_select(self, select: int, grpc_client: GrpcClientWrapper) -> None:
        with reporter.step("Create container"):
            policy = f"EC 1.1 CBF 1 SELECT {select} FROM *"
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True)

        with reporter.step("Check container nodes decomposed"):
            container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster)

            assert len(container_nodes) == select

    @allure.title("Create container with EC policy and CBF (CBF={cbf})")
    @pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)])
    def test_create_container_with_cbf(self, cbf: int, expected_nodes: int, grpc_client: GrpcClientWrapper) -> None:
        with reporter.step("Create container."):
            policy = f"EC 1.1 CBF {cbf}"
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True)

        with reporter.step("Check expected container nodes."):
            container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster)
            assert len(container_nodes) == expected_nodes

    @allure.title("Create container with EC policy and FILTER")
    def test_create_container_with_filter(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
        with reporter.step("Create Container."):
            policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(simple_object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Check object is decomposed exclusively on Russian nodes"):
            data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
            parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid)
            node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
            node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)

            for node in [node_data_chunk[1], node_parity_chunk[1]]:
                assert "Russia" in node.country

    @allure.title("Evacuation shard with chunk (type={type})")
    @pytest.mark.parametrize("type, get_chunk", [("data", get_data_chunk_object), ("parity", get_parity_chunk_object)])
    def test_evacuation_data_shard(
        self,
        restore_nodes_shards_mode: None,
        frostfs_cli: FrostfsCli,
        grpc_client: GrpcClientWrapper,
        max_object_size: int,
        type: str,
        get_chunk,
    ) -> None:
        with reporter.step("Create container."):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 1.1 CBF 1", await_mode=True)

        with reporter.step("Put object in container."):
            test_file = generate_file(max_object_size - 1000)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Get object chunks."):
            chunk = get_chunk(self, frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
            frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path())

        with reporter.step("Search shards chunk"):
            time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2)
            shard_id = grpc_client.object.chunks.get_shard_chunk(chunk_node[0], chunk)

        with reporter.step("Enable evacuation for shard"):
            frostfs_node_cli.shards.set_mode(chunk_node[0].storage_node.get_control_endpoint(), mode="read-only", id=shard_id)
            frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True)

        with reporter.step("Get object after evacuation shard"):
            grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint)

    @allure.title("[NEGATIVE] Don`t create more 1 EC policy")
    def test_more_one_ec_policy(self, grpc_client: GrpcClientWrapper) -> None:
        with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"):
            with pytest.raises(RuntimeError, match="can't parse placement policy"):
                grpc_client.container.create(
                    self.cluster.default_rpc_endpoint, policy="EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *", await_mode=True
                )

    @allure.title("Create bucket with EC policy (s3_client={s3_client})")
    @pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True)
    def test_create_bucket_with_ec_location(
        self, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, grpc_client: GrpcClientWrapper
    ) -> None:
        with reporter.step("Create bucket with EC location constrain"):
            bucket = s3_client.create_bucket(location_constraint="ec3.1")

        with reporter.step("Resolve container bucket"):
            cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)

        with reporter.step("Validate container policy"):
            container = grpc_client.container.get(self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT)
            assert container

    @allure.title("Bucket object count chunks (s3_client={s3_client}, size={object_size})")
    @pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True)
    def test_count_chunks_bucket_with_ec_location(
        self,
        s3_client: S3ClientWrapper,
        bucket_container_resolver: BucketContainerResolver,
        grpc_client: GrpcClientWrapper,
        object_size: ObjectSize,
    ) -> None:
        with reporter.step("Create bucket with EC location constrain"):
            bucket = s3_client.create_bucket(location_constraint="ec3.1")

        with reporter.step("Enable versioning object"):
            s3_client.put_bucket_versioning(bucket, VersioningStatus.ENABLED)
            bucket_status = s3_client.get_bucket_versioning_status(bucket)
            assert bucket_status == VersioningStatus.ENABLED.value

        with reporter.step("Put object in bucket"):
            test_file = generate_file(object_size.value)
            bucket_object = s3_client.put_object(bucket, test_file)

        with reporter.step("Watch replication count chunks"):
            cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
            chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
            expect_chunks = 4 if object_size.name == "simple" else 16
            assert len(chunks) == expect_chunks

    @allure.title("Replication chunk after drop (size={object_size})")
    def test_drop_chunk_and_replication(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, rep_count: int) -> None:
        with reporter.step("Create container"):
            cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True)

        with reporter.step("Put object"):
            test_file = generate_file(object_size.value)
            oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)

        with reporter.step("Get all chunks"):
            data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)

        with reporter.step("Search chunk node"):
            chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
            shell_chunk_node = chunk_node[0].host.get_shell()

        with reporter.step("Get replication count"):
            assert self.check_replication(rep_count, grpc_client, cid, oid)

        with reporter.step("Delete chunk"):
            frostfs_node_cli = FrostfsCli(
                shell_chunk_node,
                frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
                config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(),
            )
            frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{data_chunk.object_id}")

        with reporter.step("Wait replication count after drop one chunk"):
            self.wait_replication(rep_count, grpc_client, cid, oid)