# implementation Conainer methods from frostfs_sdk.client.models.client_environment import ClientEnvironment from frostfs_sdk.client.services.context_accessor import ContextAccessor from frostfs_sdk.client.services.session import SessionClient from frostfs_sdk.client.parameters.container_param import ContainerCreateParam from frostfs_sdk.client.parameters.call_context_param import CallContextParam from frostfs_sdk.client.utils.session_cache import SessionToken from frostfs_sdk.client.utils.request_constructor import RequestConstructor from frostfs_sdk.cryptography.signer import Signer from frostfs_sdk.models.dto.container import ContainerId, Container from frostfs_sdk.models.mappers.container_mapper import ContainerMapper from frostfs_sdk.models.mappers.owner_id_mapper import OwnerIdMapper from frostfs_sdk.models.mappers.version_mapper import VersionMapper from frostfs_sdk.client.parameters.create_session_param import CreateSessionParam from frostfs_sdk.protos.models.container import service_pb2 as service_pb2_container from frostfs_sdk.protos.models.container import service_pb2_grpc as service_pb2_grpc_container class ContainerClient(ContextAccessor): def __init__(self, client_environment: ClientEnvironment): super().__init__(client_environment) self.container_stub = service_pb2_grpc_container.ContainerServiceStub(client_environment.channel) self.ecdsa = client_environment.ecdsa def create_container(self, container_create_param: ContainerCreateParam, ctx: CallContextParam) -> ContainerId: request = self._create_put_request(container_create_param, ctx) response: service_pb2_container.PutResponse = self.container_stub.Put(request) return ContainerId(value=response.body.container_id.value) def _create_put_request(self, param: ContainerCreateParam, ctx: CallContextParam) -> service_pb2_container.PutRequest: """ Creates a PUT request for creating a container. """ grpc_container=ContainerMapper().to_grpc_message(param.container, self.get_context) body = service_pb2_container.PutRequest.Body( container=grpc_container, signature=Signer.sign_message_rfc_6979(self.get_context.ecdsa, grpc_container) ) request = service_pb2_container.PutRequest(body=body) session_token = self.get_or_create_session(param.session_token, ctx) proto_token = session_token RequestConstructor.add_meta_header(request, param.x_headers, proto_token) signed_request = Signer.sign(self.ecdsa.private_key, request) return signed_request def get_or_create_session(self, session_ctx: SessionToken, ctx: CallContextParam) -> bytes: if session_ctx: return session_ctx.token session_token_from_cache = self.get_context.session_cache.try_get_value(self.get_context.get_session_key()) if session_token_from_cache: return session_token_from_cache.token new_session_token = SessionClient(self.get_context).create_session(CreateSessionParam(expiration=-1), ctx) if new_session_token: self.get_context.session_cache.set_value(self.get_context.get_session_key(), new_session_token) return new_session_token.token raise ValueError("cannot create session")