from typing import Optional import grpc from frostfs_sdk.client.models.client_environment import ClientEnvironment from frostfs_sdk.client.services.context_accessor import ContextAccessor from frostfs_sdk.models.mappers.placement_policy_mapper import PlacementPolicyMapper from frostfs_sdk.models.mappers.owner_id_mapper import OwnerIdMapper from frostfs_sdk.models.mappers.version_mapper import VersionMapper from frostfs_sdk.models.mappers.uuid_extension import UuidExtension from frostfs_sdk.models.dto.container import Container from frostfs_sdk.protos.models.container import types_pb2 as types_pb2_container class ContainerMapper: @staticmethod def to_grpc_message(container: Container, client_context: ClientEnvironment) -> Optional[types_pb2_container.Container]: """ Converts Container DTO to gRPC message Args: container: Container DTO object Returns: gRPC Container message builder """ if not container: return None attributes = [ types_pb2_container.Container.Attribute(key=k, value=v) for k, v in container.attributes.items() ] if container.owner_id: owner_id = OwnerIdMapper.to_grpc_message(container.owner_id) else: owner_id = OwnerIdMapper.to_grpc_message(client_context.owner_id) if container.version: version = VersionMapper.to_grpc_message(container.version) else: version = VersionMapper.to_grpc_message(client_context.version) grpc_container = types_pb2_container.Container( nonce=container.nonce.bytes, placement_policy=PlacementPolicyMapper.to_grpc_message(container.placementPolicy), owner_id=owner_id, version=version, attributes=attributes ) return grpc_container @staticmethod def to_model(container_grpc: types_pb2_container.Container) -> Optional[Container]: """ Converts gRPC message to Container DTO Args: container_grpc: gRPC Container message Returns: Container DTO object """ if not container_grpc or container_grpc.ByteSize() == 0: return None attributes = {attr.key: attr.value for attr in container_grpc.attributes} return Container( nonce=UuidExtension.to_uuid(container_grpc.nonce), placement_policy=PlacementPolicyMapper.to_model(container_grpc.placement_policy), version=VersionMapper.to_model(container_grpc.version), owner_id=OwnerIdMapper.to_model(container_grpc.owner_id), attributes=attributes )