from google.protobuf.message import Message from typing import Dict, Optional from frostfs_sdk.models.mappers.meta_header_mapper import MetaHeaderMapper from frostfs_sdk.models.mappers.session_mapper import SessionMapper from frostfs_sdk.models.dto.meta_header import MetaHeader from frostfs_sdk.protos.models.session import types_pb2 as types_pb2_session META_HEADER_FIELD_NAME = "meta_header" class RequestConstructor: @staticmethod def add_meta_header(request: Message, x_headers: Optional[Dict[str, str]] = None, session_token: types_pb2_session.SessionToken = None): """ Adds a meta header to the request. :param request: A Protobuf Message.Builder object. :param x_headers: Optional dictionary of custom headers. :param session_token: Optional session token. :raises ValueError: If the request or required fields are missing. """ if request is None: return descriptor = request.DESCRIPTOR if getattr(descriptor, META_HEADER_FIELD_NAME) is None: raise ValueError(f"Required Protobuf field is missing: {META_HEADER_FIELD_NAME}") meta_header = getattr(request, META_HEADER_FIELD_NAME) if meta_header.ByteSize() > 0: return meta_header_builder = MetaHeaderMapper.to_grpc_message(MetaHeader()) if session_token and session_token.ByteSize() > 0: meta_header_builder.session_token = session_token if x_headers: grpc_x_headers = [ types_pb2_session.XHeader(key=key, value=value) for key, value in x_headers.items() ] meta_header_builder.x_headers.extend(grpc_x_headers) setattr(request, META_HEADER_FIELD_NAME, meta_header_builder.build())