from typing import Optional import grpc from frostfs_sdk.models.mappers.placement_policy_mapper import PlacementPolicyMapper from frostfs_sdk.models.mappers.owner_id_mapper import OwnerIdMapper from frostfs_sdk.models.mappers.version_mapper import VersionMapper from frostfs_sdk.models.mappers.uuid_extension import UuidExtension from frostfs_sdk.models.dto.container import Container from frostfs_sdk.protos.models.container import types_pb2 as types_pb2_container class ContainerMapper: @staticmethod def to_grpc_message(container: Container) -> Optional[types_pb2_container.Container]: """ Converts Container DTO to gRPC message Args: container: Container DTO object Returns: gRPC Container message builder """ if not container: return None attributes = [ types_pb2_container.Container.Attribute(key=k, value=v) for k, v in container.attributes.items() ] grpc_container = types_pb2_container.Container( nonce=container.nonce.bytes, placement_policy=PlacementPolicyMapper.to_grpc_message(container.placementPolicy), attributes=attributes ) if container.owner_id: grpc_container.owner_id = OwnerIdMapper.to_grpc_message(container.owner_id) if container.version: grpc_container.version = VersionMapper.to_grpc_message(container.version) return grpc_container @staticmethod def to_model(container_grpc: types_pb2_container.Container) -> Optional[Container]: """ Converts gRPC message to Container DTO Args: container_grpc: gRPC Container message Returns: Container DTO object """ if not container_grpc or container_grpc.ByteSize() == 0: return None attributes = {attr.key: attr.value for attr in container_grpc.attributes} return Container( nonce=UuidExtension.to_uuid(container_grpc.nonce), placement_policy=PlacementPolicyMapper.to_model(container_grpc.placement_policy), version=VersionMapper.to_model(container_grpc.version), owner_id=OwnerIdMapper.to_model(container_grpc.owner_id), attributes=attributes )