PKrTO2Yapidaora/__init__.py""" ASGI App using dataclasses module for request/response objects """ __version__ = '0.10.0' from apidaora.app import appdaora from apidaora.bodies import gzip_body from apidaora.content import ContentType from apidaora.exceptions import BadRequestError from apidaora.header import header from apidaora.method import MethodType from apidaora.responses import html, json, text from apidaora.route.decorator import route __all__ = [ BadRequestError.__name__, ContentType.__name__, MethodType.__name__, appdaora.__name__, gzip_body.__name__, header.__name__, html.__name__, json.__name__, text.__name__, route.__name__, ] PKSO*'00apidaora/app.pyfrom typing import Sequence, Union from .asgi.app import asgi_app from .asgi.base import ASGIApp from .asgi.router import Controller, make_router from .controllers.background_task import BackgroundTask Controllers = Union[ Controller, BackgroundTask, Sequence[Union[Controller, BackgroundTask]], Sequence[Controller], Sequence[BackgroundTask], ] def appdaora(controllers: Controllers) -> ASGIApp: routes = [] if not isinstance(controllers, Sequence): controllers = [controllers] for controller in controllers: if isinstance(controller, BackgroundTask): routes.append(controller.create.route) routes.append(controller.get_results.route) else: routes.append(controller.route) return asgi_app(make_router(routes)) PKjTO%)apidaora/bodies.pyimport io from typing import IO, Optional, Type from dictdaora import DictDaora from jsondaora import jsondaora try: import gzip except Exception: gzip = None # type: ignore class _GZipFactory(DictDaora): # type: ignore mode: str compresslevel: int encoding: Optional[str] errors: Optional[str] newline: Optional[str] value: bytes def open(self) -> IO[bytes]: if self.value: return gzip.open( io.BytesIO(self.value), self.mode, self.compresslevel, self.encoding, self.errors, self.newline, ) raise ValueError(self.value) def gzip_body( mode: str = 'rb', compresslevel: int = 9, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, ) -> Type[_GZipFactory]: cls_attributes = dict( mode=mode, compresslevel=compresslevel, encoding=encoding, errors=errors, newline=newline, ) factory = type('GZipFactory', (_GZipFactory,), cls_attributes) return jsondaora(factory) # type: ignore PK5O^0Oapidaora/content.pyfrom enum import Enum class ContentType(Enum): APPLICATION_JSON = 'application/json' TEXT_HTML = 'text/html; charset=utf-8' TEXT_PLAIN = 'text/plain' PKSOHapidaora/exceptions.pyfrom dataclasses import dataclass from typing import Any, Dict, Optional, Sequence from .header import _Header class APIDaoraError(Exception): ... class MethodNotFoundError(APIDaoraError): ... class PathNotFoundError(APIDaoraError): ... class InvalidReturnError(APIDaoraError): def __str__(self) -> str: return ( f"handler_name='{self.args[1].__name__}', " f"return_type='{type(self.args[0]).__name__}', " f"return_value='{self.args[0]}'" ) @dataclass class BadRequestError(APIDaoraError): name: str info: Dict[str, Any] headers: Optional[Sequence[_Header]] = None def __str__(self) -> str: return f"name='{self.name}', info={self.info}" @property def dict(self) -> Dict[str, Any]: return {'name': self.name, 'info': self.info} class InvalidTasksRepositoryError(APIDaoraError): ... class InvalidRouteArgumentsError(APIDaoraError): ... PKI{GOyHapidaora/header.pyfrom typing import Any, ClassVar, Optional, Type from dictdaora import DictDaora from jsondaora import as_typed_dict_field BUILTIN_TYPE = type class _Header(DictDaora): # type: ignore def __init__(self, value: Any): value = as_typed_dict_field(value, 'value', type(self).type) super().__init__(value=value) def header( type: Type[Any], http_name: Optional[str] = None ) -> Type[DictDaora]: annotations = { 'type': ClassVar[Type[Any]], 'http_name': ClassVar[Optional[str]], 'value': Any, } return BUILTIN_TYPE( 'Header', (_Header,), {'__annotations__': annotations, 'http_name': http_name, 'type': type}, ) PKS 1Oc:#apidaora/method.pyfrom enum import Enum class MethodType(Enum): GET = 'GET' POST = 'POST' PATCH = 'PATCH' PUT = 'PUT' DELETE = 'DELETE' OPTIONS = 'OPTIONS' HEAD = 'HEAD' TRACE = 'TRACE' CONNECT = 'CONNECT' PKȾGO_~l((apidaora/responses.pyfrom http import HTTPStatus from typing import Any, Sequence from dictdaora import DictDaora from .content import ContentType from .header import _Header class Response(DictDaora): # type: ignore status: HTTPStatus headers: Sequence[_Header] content_type: ContentType def json( body: Any, status: HTTPStatus = HTTPStatus.OK, headers: Sequence[_Header] = (), ) -> Response: return Response( body=body, status=status, headers=headers, content_type=ContentType.APPLICATION_JSON, ) def text( body: Any, status: HTTPStatus = HTTPStatus.OK, headers: Sequence[_Header] = (), ) -> Response: return Response( body=body, status=status, headers=headers, content_type=ContentType.TEXT_PLAIN, ) def html( body: Any, status: HTTPStatus = HTTPStatus.OK, headers: Sequence[_Header] = (), ) -> Response: return Response( body=body, status=status, headers=headers, content_type=ContentType.TEXT_HTML, ) PKFOapidaora/asgi/__init__.pyPKSOapidaora/asgi/app.pyimport asyncio from logging import getLogger from typing import Any, Awaitable, Callable, Dict from urllib import parse from ..exceptions import MethodNotFoundError, PathNotFoundError from .base import ASGIApp, Receiver, Scope, Sender from .responses import ( send_method_not_allowed_response, send_not_found, send_response, ) from .router import ResolvedRoute logger = getLogger(__name__) def asgi_app(router: Callable[[str, str], ResolvedRoute]) -> ASGIApp: async def controller( scope: Scope, receive: Receiver, send: Sender ) -> None: try: resolved = router(scope['path'], scope['method']) except PathNotFoundError: await send_not_found(send) except MethodNotFoundError: await send_method_not_allowed_response(send) else: route = resolved.route if route.has_query: query_dict = _get_query_dict(scope) else: query_dict = {} if route.has_body: body = await _read_body(receive) else: body = b'' if route.has_headers: headers = scope['headers'] else: headers = [] response_and_body = route.controller( resolved.path_args, query_dict, headers, body ) while asyncio.iscoroutine(response_and_body): response_and_body = await response_and_body response, body = ( (response_and_body[0], response_and_body[1]) if len(response_and_body) > 1 else (response_and_body[0], b'') ) await send_response(send, response, body) return controller def _get_query_dict(scope: Dict[str, Any]) -> Dict[str, Any]: qs = parse.parse_qs(scope['query_string'].decode()) return qs async def _read_body(receive: Callable[[], Awaitable[Dict[str, Any]]]) -> Any: body = b'' more_body = True while more_body: message = await receive() body += message.get('body', b'') more_body = message.get('more_body', False) return body PKI{GOm`apidaora/asgi/base.pyfrom typing import ( Any, Awaitable, Callable, Coroutine, Dict, List, Tuple, TypedDict, ) Scope = Dict[str, Any] Receiver = Callable[[], Awaitable[Dict[str, Any]]] Sender = Callable[[Dict[str, Any]], Awaitable[None]] ASGIApp = Callable[[Scope, Receiver, Sender], Coroutine[Any, Any, None]] ASGIPathArgs = Dict[str, str] ASGIQueryDict = Dict[str, List[str]] ASGIHeaders = Tuple[Tuple[bytes, bytes], ...] ASGIBody = bytes class ASGIResponse(TypedDict): headers: ASGIHeaders type: str status: int # ASGICallableResults = Union[ASGIResponse, Tuple[ASGIResponse, ASGIBody]] ASGICallableResults = Any ASGICallable = Callable[ [ASGIPathArgs, ASGIQueryDict, ASGIHeaders, ASGIBody], ASGICallableResults ] PKSOapidaora/asgi/controller.pyPKȾGO#` ` apidaora/asgi/responses.pyfrom http import HTTPStatus from typing import Awaitable, Optional, Tuple from ..content import ContentType from .base import ASGIHeaders, ASGIResponse, Sender HTTP_RESPONSE_START = 'http.response.start' JSON_CONTENT_HEADER = ( b'content-type', ContentType.APPLICATION_JSON.value.encode(), ) HTML_CONTENT_HEADER = (b'content-type', ContentType.TEXT_HTML.value.encode()) PLAINTEXT_CONTENT_HEADER = ( b'content-type', ContentType.TEXT_PLAIN.value.encode(), ) JSON_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.OK.value, 'headers': (JSON_CONTENT_HEADER,), } HTML_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.OK.value, 'headers': (HTML_CONTENT_HEADER,), } PLAINTEXT_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.OK.value, 'headers': (PLAINTEXT_CONTENT_HEADER,), } NOTFOUND_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.NOT_FOUND.value, 'headers': (), } METHOD_NOT_ALLOWED_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.METHOD_NOT_ALLOWED.value, 'headers': (), } NO_CONTENT_RESPONSE: ASGIResponse = { 'type': HTTP_RESPONSE_START, 'status': HTTPStatus.NO_CONTENT.value, 'headers': (), } def make_response( content_length: Optional[int], status: HTTPStatus, headers: Optional[ASGIHeaders], default_value: ASGIResponse, default_content_header: Tuple[bytes, bytes], ) -> ASGIResponse: if content_length is None: return default_value default_headers: ASGIHeaders = ( default_content_header, (b'content-length', str(content_length).encode()), ) if headers: if isinstance(headers, tuple): headers = default_headers + headers else: headers = default_headers + tuple(headers) return { 'type': HTTP_RESPONSE_START, 'status': status.value, 'headers': default_headers if not headers else headers, } def make_json_response( content_length: Optional[int] = None, status: HTTPStatus = HTTPStatus.OK, headers: Optional[ASGIHeaders] = None, ) -> ASGIResponse: return make_response( content_length, status, headers, JSON_RESPONSE, JSON_CONTENT_HEADER ) def make_text_response( content_length: Optional[int] = None, status: HTTPStatus = HTTPStatus.OK, headers: Optional[ASGIHeaders] = None, ) -> ASGIResponse: return make_response( content_length, status, headers, PLAINTEXT_RESPONSE, PLAINTEXT_CONTENT_HEADER, ) def make_html_response( content_length: Optional[int] = None, status: HTTPStatus = HTTPStatus.OK, headers: Optional[ASGIHeaders] = None, ) -> ASGIResponse: return make_response( content_length, status, headers, HTML_RESPONSE, HTML_CONTENT_HEADER ) async def send_response( send: Sender, response: ASGIResponse, body: bytes ) -> None: await send(response) # type: ignore await send( {'type': 'http.response.body', 'body': body, 'more_body': False} ) def send_not_found(send: Sender) -> Awaitable[None]: return send_response(send, NOTFOUND_RESPONSE, b'') def send_method_not_allowed_response(send: Sender) -> Awaitable[None]: return send_response(send, METHOD_NOT_ALLOWED_RESPONSE, b'') PKSOr}  apidaora/asgi/router.pyimport re from abc import ABC, abstractmethod from dataclasses import dataclass from typing import ( Any, Callable, DefaultDict, Dict, Iterable, Optional, Pattern, ) from apidaora.exceptions import MethodNotFoundError, PathNotFoundError from apidaora.method import MethodType from .base import ( ASGIBody, ASGICallableResults, ASGIHeaders, ASGIPathArgs, ASGIQueryDict, ) class Controller(ABC): route: 'Route' @abstractmethod def __call__( self, path_args: ASGIPathArgs, query_dict: ASGIQueryDict, headers: ASGIHeaders, body: ASGIBody, ) -> ASGICallableResults: ... @dataclass class Route: path_pattern: str method: MethodType controller: Controller has_path_args: bool = False has_query: bool = False has_headers: bool = False has_body: bool = False @dataclass class ResolvedRoute: route: Route path_args: Dict[str, Any] path: str @dataclass class RoutesTreeRegex: name: str compiled_re: Optional[Pattern[Any]] class RoutesTree(DefaultDict[str, Any]): regex: Optional[RoutesTreeRegex] = None def __init__(self) -> None: super().__init__(RoutesTree) PATH_RE = re.compile(r'\{(?P[^/:]+)(:(?P[^/:]+))?\}') def make_router( routes: Iterable[Route], ) -> Callable[[str, str], ResolvedRoute]: routes_tree = RoutesTree() for route in routes: path_pattern_parts = split_path(route.path_pattern) routes_tree_tmp = routes_tree for path_pattern_part in path_pattern_parts: match = PATH_RE.match(path_pattern_part) if match: group = match.groupdict() pattern = group.get('pattern') regex: Optional[Pattern[Any]] = re.compile( pattern ) if pattern else None routes_tree_tmp.regex = RoutesTreeRegex( group['name'], compiled_re=regex ) routes_tree_tmp = routes_tree_tmp[routes_tree_tmp.regex.name] continue routes_tree_tmp = routes_tree_tmp[path_pattern_part] routes_tree_tmp[route.method.value] = route def route_(path: str, method: str) -> ResolvedRoute: path_parts = split_path(path) path_args = {} routes_tree_ = routes_tree for path_part in path_parts: if path_part in routes_tree_: routes_tree_ = routes_tree_[path_part] continue if routes_tree_.regex: compiled_re = routes_tree_.regex.compiled_re match = compiled_re.match(path_part) if compiled_re else None if not match and compiled_re: raise PathNotFoundError(path) path_args[routes_tree_.regex.name] = path_part routes_tree_ = routes_tree_[routes_tree_.regex.name] continue raise PathNotFoundError(path) if method not in routes_tree_: raise MethodNotFoundError(method, path) return ResolvedRoute( route=routes_tree_[method], path_args=path_args, path=path ) return route_ def split_path(path: str) -> Iterable[str]: return path.strip(' /').split('/') PKSO apidaora/controllers/__init__.pyPKSO̝ a#a#'apidaora/controllers/background_task.pyimport asyncio import dataclasses import datetime import logging import uuid from concurrent.futures import ThreadPoolExecutor from enum import Enum from functools import partial from typing import Any, Callable, Coroutine, Dict, Type, TypedDict import orjson from jsondaora import as_typed_dict, jsondaora, typed_dict_asjson from ..asgi.router import Controller from ..exceptions import BadRequestError, InvalidTasksRepositoryError from ..method import MethodType from ..route.factory import make_route logger = logging.getLogger(__name__) try: import aioredis except Exception: aioredis = None class TaskStatusType(Enum): RUNNING = 'running' FINISHED = 'finished' ERROR = 'error' @jsondaora class TaskInfo(TypedDict): task_id: str start_time: str status: str @dataclasses.dataclass class BackgroundTask: create: Controller get_results: Controller def make_background_task( controller: Callable[..., Any], path_pattern: str, max_workers: int = 10, tasks_repository: Any = None, ) -> BackgroundTask: if asyncio.iscoroutinefunction(controller): logger.warning( 'Async tasks can potentially block your application, use with care. ' 'It use is recommended just for small tasks or non-blocking operations.' ) tasks_repository = get_tasks_repository(tasks_repository) annotations = getattr(controller, '__annotations__', {}) result_annotation = annotations.get('return', str) @jsondaora class FinishedTaskInfo(TaskInfo): end_time: str result: result_annotation # type: ignore create_task = make_create_task( controller, tasks_repository, FinishedTaskInfo, max_workers ) get_task_results = make_get_task_results( tasks_repository, FinishedTaskInfo ) create_task.__annotations__ = { name: type_ for name, type_ in annotations.items() if name != 'return' } return BackgroundTask( make_route(path_pattern, MethodType.POST, create_task).controller, make_route(path_pattern, MethodType.GET, get_task_results).controller, ) def get_iso_time() -> str: return ( datetime.datetime.now() .replace(microsecond=0, tzinfo=datetime.timezone.utc) .isoformat() ) @dataclasses.dataclass class BaseTasksRepository: async def set( self, key: Any, value: Any, task_cls: Type[Any] = TaskInfo ) -> None: ... async def get(self, key: Any, finished_task_cls: Type[Any]) -> Any: ... @dataclasses.dataclass class SimpleTasksRepository(BaseTasksRepository): data_source: Dict[str, Any] async def set( self, key: Any, value: Any, task_cls: Type[Any] = TaskInfo ) -> None: self.data_source[key] = value async def get(self, key: Any, finished_task_cls: Type[Any]) -> Any: return self.data_source[key] def get_tasks_repository(tasks_repository: Any) -> Any: if tasks_repository is None: return SimpleTasksRepository({}) elif isinstance(tasks_repository, str) and tasks_repository.startswith( 'redis://' ): if aioredis is None: raise InvalidTasksRepositoryError("'aioredis' package not found!") return partial(get_redis_tasks_repository, tasks_repository) elif isinstance(tasks_repository, BaseTasksRepository): return tasks_repository raise InvalidTasksRepositoryError(tasks_repository) def make_create_task( controller: Callable[..., Any], tasks_repository: Any, finished_task_info_cls: Any, max_workers: int, ) -> Callable[..., Coroutine[Any, Any, TaskInfo]]: executor = ThreadPoolExecutor(max_workers) async def create_task(*args: Any, **kwargs: Any) -> TaskInfo: task_id = uuid.uuid4() if asyncio.iscoroutinefunction(controller): loop = asyncio.get_running_loop() if isinstance(tasks_repository, partial): tasks_repository_ = await tasks_repository() # noqa else: tasks_repository_ = tasks_repository wrapper = make_task_wrapper( tasks_repository, task_id, finished_task_info_cls, controller, *args, **kwargs, ) future = asyncio.run_coroutine_threadsafe(wrapper(), loop) else: done_callback = make_done_callback( tasks_repository, task_id, finished_task_info_cls ) future = executor.submit(controller, *args, **kwargs) future.add_done_callback(done_callback) start_time = get_iso_time() task = TaskInfo( task_id=str(task_id), start_time=start_time, status=TaskStatusType.RUNNING.value, ) if isinstance(tasks_repository, partial): tasks_repository_ = await tasks_repository() # noqa else: tasks_repository_ = tasks_repository await tasks_repository_.set( task_id, task, task_cls=finished_task_info_cls ) return task return create_task def make_task_wrapper( tasks_repository: Any, task_id: uuid.UUID, finished_task_info_cls: Any, controller: Callable[..., Any], *args: Any, **kwargs: Any, ) -> Callable[..., Coroutine[Any, Any, TaskInfo]]: async def wrapper() -> Any: result = await controller(*args, **kwargs) task = await tasks_repository.get(task_id, finished_task_info_cls) finished_task = finished_task_info_cls( end_time=get_iso_time(), result=result, status=TaskStatusType.FINISHED.value, task_id=task['task_id'], start_time=task['start_time'], ) await tasks_repository.set( task_id, finished_task, task_cls=finished_task_info_cls ) return wrapper def make_done_callback( tasks_repository: Any, task_id: uuid.UUID, finished_task_info_cls: Any ) -> Callable[[Any], None]: def done_callback(future: Any) -> None: result = future.result() policy = asyncio.get_event_loop_policy() loop = policy.new_event_loop() if isinstance(tasks_repository, partial): tasks_repository_ = loop.run_until_complete(tasks_repository()) else: tasks_repository_ = tasks_repository task = loop.run_until_complete( tasks_repository_.get(task_id, finished_task_info_cls) ) finished_task = finished_task_info_cls( end_time=get_iso_time(), result=result, status=TaskStatusType.FINISHED.value, task_id=task['task_id'], start_time=task['start_time'], ) loop.run_until_complete( tasks_repository_.set( task_id, finished_task, task_cls=finished_task_info_cls ) ) return done_callback def make_get_task_results( tasks_repository: Any, finished_task_info_cls: Any ) -> Callable[..., Coroutine[Any, Any, TaskInfo]]: async def get_task_results(task_id: str) -> finished_task_info_cls: # type: ignore if isinstance(tasks_repository, partial): tasks_repository_ = await tasks_repository() # noqa else: tasks_repository_ = tasks_repository try: return await tasks_repository_.get( # type: ignore uuid.UUID(task_id), finished_task_info_cls ) except KeyError: raise BadRequestError( name='invalid_task_id', info={'task_id': task_id} ) except ValueError as error: if error.args == ('badly formed hexadecimal UUID string',): raise BadRequestError( name='invalid_task_id', info={'task_id': task_id} ) raise error from None return get_task_results if aioredis is not None: @dataclasses.dataclass class RedisTasksRepository(BaseTasksRepository): data_source: aioredis.Redis async def set( self, key: Any, value: Any, task_cls: Type[Any] = TaskInfo ) -> None: await self.data_source.set( str(key), typed_dict_asjson(value, task_cls) ) async def get(self, key: Any, finished_task_cls: Type[Any]) -> Any: value = await self.data_source.get(str(key)) if value: value = orjson.loads(value) if value['status'] == TaskStatusType.RUNNING.value: return as_typed_dict(value, TaskInfo) if value['status'] == TaskStatusType.FINISHED.value: return as_typed_dict(value, finished_task_cls) raise KeyError(key) async def get_redis_tasks_repository(uri: str) -> RedisTasksRepository: data_source = await aioredis.create_redis_pool(uri) return RedisTasksRepository(data_source) PKFOapidaora/route/__init__.pyPKcSO]$49%apidaora/route/annotations_getters.pyfrom typing import Any, Dict, Type from ..header import _Header def get_annotations_path_args( path_pattern: str, annotations: Dict[str, Type[Any]] ) -> Dict[str, Type[Any]]: return { name: type_ for name, type_ in annotations.items() if f'{{{name}}}' in path_pattern } def get_annotations_query_dict( path_args_annotations: Dict[str, Type[Any]], headers_annotations: Dict[str, Type[Any]], annotations: Dict[str, Type[Any]], ) -> Dict[str, Type[Any]]: return { name: type_ for name, type_ in annotations.items() if name not in path_args_annotations and name not in headers_annotations and name != 'return' } def get_annotations_headers( headers_name_map: Dict[str, str], annotations: Dict[str, Type[Any]] ) -> Dict[str, Type[Any]]: annotations_headers: Dict[str, Type[_Header]] = {} for name, type_ in annotations.items(): if isinstance(type_, type) and issubclass(type_, _Header): if type_.http_name is None: http_name = ( '' if name.startswith('x-') or name.startswith('x_') else 'x-' ) http_name += name.replace('_', '-') type_.http_name = http_name else: http_name = type_.http_name annotations_headers[name] = type_ headers_name_map[http_name] = name return annotations_headers def get_annotations_body( annotations: Dict[str, Type[Any]] ) -> Dict[str, Type[Any]]: body_type = annotations.get('body') if body_type: return {'body': body_type} return {} PKȾGOtD& "apidaora/route/controller_input.pyfrom dataclasses import dataclass from typing import Any, Callable, ClassVar, Dict, Type from dictdaora import DictDaora from jsondaora import jsondaora from .annotations_getters import ( get_annotations_body, get_annotations_headers, get_annotations_path_args, get_annotations_query_dict, ) @dataclass class AnnotationsInfo: has_input: bool = False has_path_args: bool = False has_query_dict: bool = False has_headers: bool = False has_body: bool = False class ControllerInput(DictDaora): # type: ignore __annotations_info__: ClassVar[AnnotationsInfo] = AnnotationsInfo() __annotations_path_args__: ClassVar[Dict[str, Type[Any]]] = {} __annotations_query_dict__: ClassVar[Dict[str, Type[Any]]] = {} __annotations_headers__: ClassVar[Dict[str, Type[Any]]] = {} __annotations_body__: ClassVar[Dict[str, Type[Any]]] = {} __headers_name_map__: ClassVar[Dict[str, str]] = {} def controller_input( controller: Callable[..., Any], path_pattern: str ) -> Type[ControllerInput]: annotations_info = AnnotationsInfo() headers_name_map: Dict[str, str] = {} if hasattr(controller, '__annotations__'): annotations_path_args = get_annotations_path_args( path_pattern, controller.__annotations__ ) annotations_headers = get_annotations_headers( headers_name_map, controller.__annotations__ ) annotations_query_dict = get_annotations_query_dict( annotations_path_args, annotations_headers, controller.__annotations__, ) annotations_body = get_annotations_body(controller.__annotations__) all_annotations = ( annotations_path_args, annotations_query_dict, annotations_headers, annotations_body, ) annotations = { name: type_ for partial_annotations in all_annotations for name, type_ in partial_annotations.items() } if annotations: annotations_info.has_input = True if annotations_path_args: annotations_info.has_path_args = True if annotations_query_dict: annotations_info.has_query_dict = True if annotations_headers: annotations_info.has_headers = True if annotations_body: annotations_info.has_body = True AnnotatedControllerInput = jsondaora( type( 'AnnotatedControllerInput', (ControllerInput,), { '__annotations__': annotations, '__annotations_info__': annotations_info, '__annotations_path_args__': annotations_path_args, '__annotations_query_dict__': annotations_query_dict, '__annotations_headers__': annotations_headers, '__annotations_body__': annotations_body, '__headers_name_map__': headers_name_map, }, ) ) return AnnotatedControllerInput # type: ignore return ControllerInput PKSOapidaora/route/decorator.pyimport functools from typing import Any, Callable, Union from ..asgi.router import Controller from ..controllers.background_task import BackgroundTask, make_background_task from ..exceptions import InvalidRouteArgumentsError, MethodNotFoundError from ..method import MethodType from .factory import make_route class _RouteDecorator: def __getattr__(self, attr_name: str) -> Any: if attr_name == '__name__': return type(self).__name__ if attr_name == 'background': brackground = True else: brackground = False method = attr_name.upper() if method not in MethodType.__members__: raise MethodNotFoundError(attr_name) def decorator( path_pattern: str, **kwargs: Any ) -> Callable[[Callable[..., Any]], Controller]: if len(kwargs) > 0 and tuple(kwargs.keys()) != ( 'tasks_repository', ): raise InvalidRouteArgumentsError(kwargs) @functools.wraps(make_route) def wrapper( controller: Callable[..., Any] ) -> Union[Controller, BackgroundTask]: if brackground: tasks_repository = kwargs.get('tasks_repository') return make_background_task( controller, path_pattern, tasks_repository=tasks_repository, ) else: route = make_route( path_pattern, MethodType[method], controller ) return route.controller return wrapper return decorator route = _RouteDecorator() PKjTO+7J+#+#apidaora/route/factory.pyimport functools from asyncio import iscoroutine from dataclasses import is_dataclass from http import HTTPStatus from json import JSONDecodeError from typing import ( Any, Awaitable, Callable, Dict, Optional, Sequence, Type, Union, ) import orjson from jsondaora import as_typed_dict, dataclass_asjson, typed_dict_asjson from jsondaora.exceptions import DeserializationError from ..asgi.base import ( ASGIBody, ASGICallableResults, ASGIHeaders, ASGIPathArgs, ASGIQueryDict, ) from ..asgi.responses import ( make_html_response, make_json_response, make_text_response, ) from ..asgi.router import Controller, Route from ..bodies import _GZipFactory from ..content import ContentType from ..exceptions import BadRequestError, InvalidReturnError from ..header import _Header from ..method import MethodType from ..responses import Response from .controller_input import controller_input RESPONSES_MAP = { ContentType.APPLICATION_JSON: make_json_response, ContentType.TEXT_PLAIN: make_text_response, ContentType.TEXT_HTML: make_html_response, } def make_route( path_pattern: str, method: MethodType, controller: Callable[..., Any], has_content_length: bool = True, ) -> Route: ControllerInput = controller_input(controller, path_pattern) annotations_info = ControllerInput.__annotations_info__ annotations_path_args = ControllerInput.__annotations_path_args__ annotations_query_dict = ControllerInput.__annotations_query_dict__ annotations_headers = ControllerInput.__annotations_headers__ body_type = ControllerInput.__annotations_body__.get('body') return_type = controller.__annotations__.get('return') def parse_asgi_input( path_args: ASGIPathArgs, query_dict: ASGIQueryDict, headers: ASGIHeaders, body: ASGIBody, ) -> Dict[str, Any]: kwargs: Dict[str, Any] if annotations_info.has_input: if annotations_info.has_path_args: kwargs = { name: path_args.get(name) for name in annotations_path_args.keys() } else: kwargs = {} if annotations_info.has_query_dict: kwargs.update( { name: value if ( (value := query_dict.get(name)) # noqa and len(value) > 1 # type: ignore ) else value[0] if isinstance(value, list) else None for name in annotations_query_dict.keys() } ) if annotations_info.has_headers: headers_map = ControllerInput.__headers_name_map__ kwargs.update( { name: annotations_headers[name](h_value.decode()) # type: ignore for h_name, h_value in headers if (name := headers_map.get(h_name.decode())) # noqa in annotations_headers } ) if annotations_info.has_body: if isinstance(body_type, type) and issubclass( body_type, _GZipFactory ): kwargs['body'] = body_type(value=body) input_ = as_typed_dict(kwargs, ControllerInput) return input_ # type: ignore else: kwargs['body'] = make_json_request_body(body, body_type) return as_typed_dict( # type: ignore kwargs, ControllerInput ) return {} async def build_asgi_output( controller_output: Any, status: HTTPStatus = HTTPStatus.OK, headers: Optional[Sequence[_Header]] = None, content_type: ContentType = ContentType.APPLICATION_JSON, return_type_: Any = None, ) -> ASGICallableResults: while iscoroutine(controller_output): controller_output = await controller_output if return_type_ is None and return_type: return_type_ = return_type if isinstance(controller_output, Response): return build_asgi_output( controller_output['body'], controller_output['status'], controller_output['headers'], controller_output['content_type'], controller_output.__annotations__.get('body'), ) elif isinstance(controller_output, dict): if return_type_: body = typed_dict_asjson(controller_output, return_type_) else: body = orjson.dumps(controller_output) elif is_dataclass(controller_output): body = dataclass_asjson(controller_output) elif ( isinstance(controller_output, tuple) or isinstance(controller_output, str) or isinstance(controller_output, list) or isinstance(controller_output, int) or isinstance(controller_output, bool) or isinstance(controller_output, float) ): if content_type == content_type.APPLICATION_JSON: body = orjson.dumps(controller_output) else: body = str(controller_output).encode() elif controller_output is None: content_length = 0 if has_content_length else None if headers: return RESPONSES_MAP[content_type]( content_length, headers=make_asgi_headers(headers) ) return RESPONSES_MAP[content_type](content_length) else: raise InvalidReturnError(controller_output, controller) content_length = len(body) if has_content_length else None return ( RESPONSES_MAP[content_type]( content_length, status, make_asgi_headers(headers) ), body, ) class WrappedController(Controller): @functools.wraps(controller) def __call__( self, path_args: ASGIPathArgs, query_dict: ASGIQueryDict, headers: ASGIHeaders, body: ASGIBody, ) -> Union[Awaitable[ASGICallableResults], ASGICallableResults]: try: kwargs = parse_asgi_input(path_args, query_dict, headers, body) controller_output = controller(**kwargs) return build_asgi_output(controller_output) except BadRequestError as error: return send_bad_request_response( error.dict, has_content_length, error.headers ) except DeserializationError as error: field = { 'name': error.args[0].name, 'type': error.args[0].type.__name__, 'invalid_value': error.args[1], } error_dict = { 'name': 'field-parsing', 'info': {'field': field}, } return send_bad_request_response( error_dict, has_content_length ) wrapped_controller = WrappedController() route = Route( path_pattern, method, wrapped_controller, annotations_info.has_path_args, annotations_info.has_query_dict, annotations_info.has_headers, annotations_info.has_body, ) wrapped_controller.route = route return route def make_json_request_body(body: bytes, body_type: Optional[Type[Any]]) -> Any: try: return orjson.loads(body) except JSONDecodeError: schema = ( getattr(body_type, '__annotations__', {}) if body_type else None ) schema = {k: t.__name__ for k, t in schema.items()} raise BadRequestError(name='invalid-body', info={'schema': schema}) def send_bad_request_response( error_dict: Dict[str, Any], has_content_length: bool, headers: Optional[Sequence[_Header]] = None, content_type: ContentType = ContentType.APPLICATION_JSON, ) -> ASGICallableResults: body = orjson.dumps({'error': error_dict}) content_length = len(body) if has_content_length else None return ( RESPONSES_MAP[content_type]( content_length, HTTPStatus.BAD_REQUEST, make_asgi_headers(headers) ), body, ) def make_asgi_headers( headers: Optional[Sequence[_Header]], ) -> Optional[ASGIHeaders]: if not headers: return None return tuple( ( header.http_name.encode(), str(header.value).encode() if not isinstance(header.value, bytes) else header.value, ) for header in headers ) PKϺ0O?,,!apidaora-0.10.0.dist-info/LICENSEMIT License Copyright (c) 2019 Diogo Dutra Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOapidaora-0.10.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H^ V"apidaora-0.10.0.dist-info/METADATAXs۸ο;Ev^m97$ IHHO$%Yi>܌eO6:=&Oh#*$unt8-W1g0DDK^bK,\S+4M!KhH"w44 p:KRL$M<4a$~/sc(2ƹ+*ْy}tp$Lͣ6JW%R{86"ڍ$ n=G$ũx< Rii m~Aԙ$^ qS-3rQ8rE u2mS9\EHՋ.vr4tUwbf_)j;fDOarf1+c**:cwZpPzC)KbP­OraXJ8t_мqthiܸUy}hFYE'cD+hXBf{ Q"ff1vwQ.|3?P%F!µ-h׬e/[FUI!ByZcMqX ꁑmɘ3t, bM6 ݁ˣ~Q32"Ҁ x{#`(r4ŭzu+荑arw~rdY]f9[xnjw;a\ N6*P̠a~B}Q0YhK#ECe6~x)4EFudxOguv  b(;v.)'=b̂ٱ=%Шpjݾbjjom{XT&-Gr%[GQ=\fG&~ ꉙgf"An6íQL*Qhm>{󞷵FnzgQz>: S i|_C PK!H! apidaora-0.10.0.dist-info/RECORDDzXཟbA D uAD<\xy6_uݧA[Ġ_AP9HO>j4WnDꮙ%zơxBE|?-ж؟ W.#*Q5+fęZ AO&l"73GuҞ&vPYSZb|.w{(zXw1y`y*X=w4{]zdD*ccrA趹%P42I0eh1A ي@;(y4"7FO-O@tv{t@<6ˣy?J]d4'](:}]0C Æ&ǹ$v Ta. G$>hۙA4jj !uq(=nj4Oǹ3D}_ݾat$ +0wDmtmQܐ~v]t֓A8kK}r&"%&YMdR;Ș'>y0|.VOp`p[5Q*`}Ma?==LJ|$)`Ů]Ӳ"$Yzx{,^ogF ۘ m32Y(?'YtJyL^eee{)j 9VɚϖM>3O(ޟSp$8^ٮܖ; ق%qSUu{UXu}tڬzд*FD/pو%o_4 ]$[(E'p.9w:imqm@>N#',᧌Umc~7jiõR7D31oٕA[x-S_@QnVDic,Eh7&;~g~\Ƀ{e=)F..;ŷd`t$A4MQMMPKrTO2Yapidaora/__init__.pyPKSO*'00apidaora/app.pyPKjTO%)'apidaora/bodies.pyPK5O^0O apidaora/content.pyPKSOH apidaora/exceptions.pyPKI{GOyHapidaora/header.pyPKS 1Oc:#apidaora/method.pyPKȾGO_~l((apidaora/responses.pyPKFOapidaora/asgi/__init__.pyPKSORapidaora/asgi/app.pyPKI{GOm`!apidaora/asgi/base.pyPKSO6$apidaora/asgi/controller.pyPKȾGO#` ` o$apidaora/asgi/responses.pyPKSOr}  2apidaora/asgi/router.pyPKSO Q?apidaora/controllers/__init__.pyPKSO̝ a#a#'?apidaora/controllers/background_task.pyPKFO5capidaora/route/__init__.pyPKcSO]$49%mcapidaora/route/annotations_getters.pyPKȾGOtD& "Wjapidaora/route/controller_input.pyPKSONwapidaora/route/decorator.pyPKjTO+7J+#+#r~apidaora/route/factory.pyPKϺ0O?,,!ԡapidaora-0.10.0.dist-info/LICENSEPK!HPO?apidaora-0.10.0.dist-info/WHEELPK!H^ V"̦apidaora-0.10.0.dist-info/METADATAPK!H! apidaora-0.10.0.dist-info/RECORDPKe