frostfs-sdk-python/frostfs_sdk/client/utils/request_constructor.py
Ilyas Niyazov 297e107b10 [#3] Added generate proto script create container method
Signed-off-by: Ilyas Niyazov <i.niyazov@yadro.com>
2025-03-25 11:13:48 +03:00

47 lines
1.8 KiB
Python

from google.protobuf.message import Message
from typing import Dict, Optional
from frostfs_sdk.models.mappers.meta_header_mapper import MetaHeaderMapper
from frostfs_sdk.models.mappers.session_mapper import SessionMapper
from frostfs_sdk.models.dto.meta_header import MetaHeader
from frostfs_sdk.protos.models.session import types_pb2 as types_pb2_session
META_HEADER_FIELD_NAME = "meta_header"
class RequestConstructor:
@staticmethod
def add_meta_header(request: Message, x_headers: Optional[Dict[str, str]] = None, session_token: types_pb2_session.SessionToken = None):
"""
Adds a meta header to the request.
:param request: A Protobuf Message.Builder object.
:param x_headers: Optional dictionary of custom headers.
:param session_token: Optional session token.
:raises ValueError: If the request or required fields are missing.
"""
if request is None:
return
descriptor = request.DESCRIPTOR
if getattr(descriptor, META_HEADER_FIELD_NAME) is None:
raise ValueError(f"Required Protobuf field is missing: {META_HEADER_FIELD_NAME}")
meta_header = getattr(request, META_HEADER_FIELD_NAME)
if meta_header.ByteSize() > 0:
return
meta_header_builder = MetaHeaderMapper.to_grpc_message(MetaHeader())
if session_token and session_token.ByteSize() > 0:
meta_header_builder.session_token = session_token
if x_headers:
grpc_x_headers = [
types_pb2_session.XHeader(key=key, value=value)
for key, value in x_headers.items()
]
meta_header_builder.x_headers.extend(grpc_x_headers)
setattr(request, META_HEADER_FIELD_NAME, meta_header_builder.build())