frostfs-sdk-python/frostfs_sdk/models/mappers/container_mapper.py
Ilyas Niyazov f8465e5b99 Added create container grpc method
Signed-off-by: Ilyas Niyazov <i.niyazov@yadro.com>
2025-03-10 13:46:17 +03:00

65 lines
2.2 KiB
Python

from typing import ByteString, Optional
from frostfs_sdk.models.mappers.placement_policy_mapper import PlacementPolicyMapper
import protos.models.container.types_pb2 as types_pb2_container
from frostfs_sdk.models.dto.container import Container
class ContainerMapper:
@staticmethod
def to_grpc_message(container: Container) -> Optional[types_pb2_container.Container]:
"""
Converts Container DTO to gRPC message
Args:
container: Container DTO object
Returns:
gRPC Container message builder
"""
if not container:
return None
attributes = [
types_pb2_container.Container.Attribute(key=k, value=v)
for k, v in container.attributes.items()
]
grpc_container = types_pb2_container.Container(
# nonce=ByteString.copy(container.nonce),
placement_policy=PlacementPolicyMapper.to_grpc_message(container.placementPolicy),
attributes=attributes
)
# if container.owner_id:
# grpc_container.owner_id = OwnerIdMapper.to_grpc_message(container.owner_id)
# if container.version:
# grpc_container.version = VersionMapper.to_grpc_message(container.version)
return grpc_container
@staticmethod
def to_model(container_grpc: types_pb2_container.Container) -> Optional[Container]:
"""
Converts gRPC message to Container DTO
Args:
container_grpc: gRPC Container message
Returns:
Container DTO object
"""
if not container_grpc or container_grpc.ByteSize() == 0:
return None
attributes = {attr.key: attr.value for attr in container_grpc.attributes}
return Container(
# nonce=UuidUtils.as_uuid(container_grpc.nonce.to_bytes()),
placement_policy=PlacementPolicyMapper.to_model(container_grpc.placement_policy),
# version=VersionMapper.to_model(container_grpc.version),
# owner_id=OwnerIdMapper.to_model(container_grpc.owner_id),
attributes=attributes
)