PK MQ# fastapi/__init__.py"""FastAPI framework, high performance, easy to learn, fast to code, ready for production""" __version__ = "0.1.6" from .applications import FastAPI from .routing import APIRouter from .params import Body, Path, Query, Header, Cookie, Form, File, Security, Depends PK ]MH"5 5 fastapi/applications.pyfrom typing import Any, Callable, Dict, List, Optional, Type from fastapi import routing from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.openapi.utils import get_openapi from pydantic import BaseModel from starlette.applications import Starlette from starlette.exceptions import ExceptionMiddleware, HTTPException from starlette.middleware.errors import ServerErrorMiddleware from starlette.middleware.lifespan import LifespanMiddleware from starlette.requests import Request from starlette.responses import JSONResponse, Response async def http_exception(request: Request, exc: HTTPException) -> JSONResponse: return JSONResponse({"detail": exc.detail}, status_code=exc.status_code) class FastAPI(Starlette): def __init__( self, debug: bool = False, template_directory: str = None, title: str = "Fast API", description: str = "", version: str = "0.1.0", openapi_url: str = "/openapi.json", swagger_ui_url: str = "/docs", redoc_url: str = "/redoc", **extra: Dict[str, Any], ) -> None: self._debug = debug self.router: routing.APIRouter = routing.APIRouter() self.exception_middleware = ExceptionMiddleware(self.router, debug=debug) self.error_middleware = ServerErrorMiddleware( self.exception_middleware, debug=debug ) self.lifespan_middleware = LifespanMiddleware(self.error_middleware) self.schema_generator = None self.template_env = self.load_template_env(template_directory) self.title = title self.description = description self.version = version self.openapi_url = openapi_url self.swagger_ui_url = swagger_ui_url self.redoc_url = redoc_url self.extra = extra self.openapi_version = "3.0.2" if self.openapi_url: assert self.title, "A title must be provided for OpenAPI, e.g.: 'My API'" assert self.version, "A version must be provided for OpenAPI, e.g.: '2.1.0'" if self.swagger_ui_url or self.redoc_url: assert self.openapi_url, "The openapi_url is required for the docs" self.openapi_schema: Optional[Dict[str, Any]] = None self.setup() def openapi(self) -> Dict: if not self.openapi_schema: self.openapi_schema = get_openapi( title=self.title, version=self.version, openapi_version=self.openapi_version, description=self.description, routes=self.routes, ) return self.openapi_schema def setup(self) -> None: if self.openapi_url: self.add_route( self.openapi_url, lambda req: JSONResponse(self.openapi()), include_in_schema=False, ) if self.swagger_ui_url: self.add_route( self.swagger_ui_url, lambda r: get_swagger_ui_html( openapi_url=self.openapi_url, title=self.title + " - Swagger UI" ), include_in_schema=False, ) if self.redoc_url: self.add_route( self.redoc_url, lambda r: get_redoc_html( openapi_url=self.openapi_url, title=self.title + " - ReDoc" ), include_in_schema=False, ) self.add_exception_handler(HTTPException, http_exception) def add_api_route( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> None: self.router.add_api_route( path, endpoint=endpoint, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def api_route( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: def decorator(func: Callable) -> Callable: self.router.add_api_route( path, func, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) return func return decorator def include_router(self, router: routing.APIRouter, *, prefix: str = "") -> None: self.router.include_router(router, prefix=prefix) def get( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.get( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def put( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.put( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def post( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.post( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def delete( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.delete( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def options( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.options( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def head( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.head( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def patch( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.patch( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def trace( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.router.trace( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) PK M4? ? fastapi/encoders.pyfrom enum import Enum from types import GeneratorType from typing import Any, Set from pydantic import BaseModel from pydantic.json import pydantic_encoder def jsonable_encoder( obj: Any, include: Set[str] = None, exclude: Set[str] = set(), by_alias: bool = False, include_none: bool = True, ) -> Any: if isinstance(obj, BaseModel): return jsonable_encoder( obj.dict(include=include, exclude=exclude, by_alias=by_alias), include_none=include_none, ) if isinstance(obj, Enum): return obj.value if isinstance(obj, (str, int, float, type(None))): return obj if isinstance(obj, dict): return { jsonable_encoder( key, by_alias=by_alias, include_none=include_none ): jsonable_encoder(value, by_alias=by_alias, include_none=include_none) for key, value in obj.items() if value is not None or include_none } if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)): return [ jsonable_encoder( item, include=include, exclude=exclude, by_alias=by_alias, include_none=include_none, ) for item in obj ] return pydantic_encoder(obj) PK M+͖ fastapi/params.pyfrom enum import Enum from typing import Any, Callable, Sequence from pydantic import Schema class ParamTypes(Enum): query = "query" header = "header" path = "path" cookie = "cookie" class Param(Schema): in_: ParamTypes def __init__( self, default: Any, *, deprecated: bool = None, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.deprecated = deprecated super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Path(Param): in_ = ParamTypes.path def __init__( self, default: Any, *, deprecated: bool = None, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.description = description self.deprecated = deprecated self.in_ = self.in_ super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Query(Param): in_ = ParamTypes.query def __init__( self, default: Any, *, deprecated: bool = None, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.description = description self.deprecated = deprecated super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Header(Param): in_ = ParamTypes.header def __init__( self, default: Any, *, deprecated: bool = None, alias: str = None, convert_underscores: bool = True, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.description = description self.deprecated = deprecated self.convert_underscores = convert_underscores super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Cookie(Param): in_ = ParamTypes.cookie def __init__( self, default: Any, *, deprecated: bool = None, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.description = description self.deprecated = deprecated super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Body(Schema): def __init__( self, default: Any, *, embed: bool = False, media_type: str = "application/json", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.embed = embed self.media_type = media_type super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Form(Body): def __init__( self, default: Any, *, sub_key: bool = False, media_type: str = "application/x-www-form-urlencoded", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): super().__init__( default, embed=sub_key, media_type=media_type, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class File(Form): def __init__( self, default: Any, *, sub_key: bool = False, media_type: str = "multipart/form-data", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): super().__init__( default, embed=sub_key, media_type=media_type, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Depends: def __init__(self, dependency: Callable = None): self.dependency = dependency class Security(Depends): def __init__(self, dependency: Callable = None, scopes: Sequence[str] = None): self.scopes = scopes or [] super().__init__(dependency=dependency) PK M oK K fastapi/routing.pyimport asyncio import inspect import logging from typing import Any, Callable, List, Optional, Type from fastapi import params from fastapi.dependencies.models import Dependant from fastapi.dependencies.utils import get_body_field, get_dependant, solve_dependencies from fastapi.encoders import jsonable_encoder from pydantic import BaseConfig, BaseModel, Schema from pydantic.error_wrappers import ErrorWrapper, ValidationError from pydantic.fields import Field from pydantic.utils import lenient_issubclass from starlette import routing from starlette.concurrency import run_in_threadpool from starlette.exceptions import HTTPException from starlette.formparsers import UploadFile from starlette.requests import Request from starlette.responses import JSONResponse, Response from starlette.routing import get_name, request_response from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY def serialize_response(*, field: Field = None, response: Response) -> Any: if field: errors = [] value, errors_ = field.validate(response, {}, loc=("response",)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) if errors: raise ValidationError(errors) return jsonable_encoder(value) else: return jsonable_encoder(response) def get_app( dependant: Dependant, body_field: Field = None, status_code: int = 200, content_type: Type[Response] = JSONResponse, response_field: Field = None, ) -> Callable: assert dependant.call is not None, "dependant.call must me a function" is_coroutine = asyncio.iscoroutinefunction(dependant.call) is_body_form = body_field and isinstance(body_field.schema, params.Form) async def app(request: Request) -> Response: try: body = None if body_field: if is_body_form: raw_body = await request.form() body = {} for field, value in raw_body.items(): if isinstance(value, UploadFile): body[field] = await value.read() else: body[field] = value else: body = await request.json() except Exception as e: logging.error("Error getting request body", e) raise HTTPException( status_code=400, detail="There was an error parsing the body" ) try: values, errors = await solve_dependencies( request=request, dependant=dependant, body=body ) except Exception as e: logging.error("Error solving dependencies", e) raise HTTPException(status_code=400, detail="Error processing request") if errors: errors_out = ValidationError(errors) raise HTTPException( status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail=errors_out.errors() ) else: assert dependant.call is not None, "dependant.call must me a function" if is_coroutine: raw_response = await dependant.call(**values) else: raw_response = await run_in_threadpool(dependant.call, **values) if isinstance(raw_response, Response): return raw_response if isinstance(raw_response, BaseModel): return content_type( content=serialize_response( field=response_field, response=raw_response ), status_code=status_code, ) errors = [] try: return content_type( content=serialize_response( field=response_field, response=raw_response ), status_code=status_code, ) except Exception as e: errors.append(e) try: response = dict(raw_response) return content_type( content=serialize_response(field=response_field, response=response), status_code=status_code, ) except Exception as e: errors.append(e) try: response = vars(raw_response) return content_type( content=serialize_response(field=response_field, response=response), status_code=status_code, ) except Exception as e: errors.append(e) raise ValueError(errors) return app class APIRoute(routing.Route): def __init__( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> None: assert path.startswith("/"), "Routed paths must always start with '/'" self.path = path self.endpoint = endpoint self.name = get_name(endpoint) if name is None else name self.response_model = response_model if self.response_model: assert lenient_issubclass( content_type, JSONResponse ), "To declare a type the response must be a JSON response" response_name = "Response_" + self.name self.response_field: Optional[Field] = Field( name=response_name, type_=self.response_model, class_validators=[], default=None, required=False, model_config=BaseConfig(), schema=Schema(None), ) else: self.response_field = None self.status_code = status_code self.tags = tags or [] self.summary = summary self.description = description or self.endpoint.__doc__ self.response_description = response_description self.deprecated = deprecated if methods is None: methods = ["GET"] self.methods = methods self.operation_id = operation_id self.include_in_schema = include_in_schema self.content_type = content_type self.path_regex, self.path_format, self.param_convertors = self.compile_path( path ) assert inspect.isfunction(endpoint) or inspect.ismethod( endpoint ), f"An endpoint must be a function or method" self.dependant = get_dependant(path=path, call=self.endpoint) self.body_field = get_body_field(dependant=self.dependant, name=self.name) self.app = request_response( get_app( dependant=self.dependant, body_field=self.body_field, status_code=self.status_code, content_type=self.content_type, response_field=self.response_field, ) ) class APIRouter(routing.Router): def add_api_route( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> None: route = APIRoute( path, endpoint=endpoint, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) self.routes.append(route) def api_route( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: def decorator(func: Callable) -> Callable: self.add_api_route( path, func, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) return func return decorator def include_router(self, router: "APIRouter", *, prefix: str = "") -> None: if prefix: assert prefix.startswith("/"), "A path prefix must start with '/'" assert not prefix.endswith( "/" ), "A path prefix must not end with '/', as the routes will start with '/'" for route in router.routes: if isinstance(route, APIRoute): self.add_api_route( prefix + route.path, route.endpoint, response_model=route.response_model, status_code=route.status_code, tags=route.tags or [], summary=route.summary, description=route.description, response_description=route.response_description, deprecated=route.deprecated, name=route.name, methods=route.methods, operation_id=route.operation_id, include_in_schema=route.include_in_schema, content_type=route.content_type, ) elif isinstance(route, routing.Route): self.add_route( prefix + route.path, route.endpoint, methods=route.methods, name=route.name, include_in_schema=route.include_in_schema, ) def get( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["GET"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def put( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["PUT"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def post( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["POST"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def delete( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["DELETE"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def options( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["OPTIONS"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def head( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["HEAD"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def patch( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["PATCH"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) def trace( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, name=name, methods=["TRACE"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, ) PK M. fastapi/utils.pyimport re from typing import Any, Dict, List, Sequence, Set, Type from fastapi import routing from fastapi.openapi.constants import REF_PREFIX from pydantic import BaseModel from pydantic.fields import Field from pydantic.schema import get_flat_models_from_fields, model_process_schema from starlette.routing import BaseRoute def get_flat_models_from_routes( routes: Sequence[Type[BaseRoute]] ) -> Set[Type[BaseModel]]: body_fields_from_routes: List[Field] = [] responses_from_routes: List[Field] = [] for route in routes: if getattr(route, "include_in_schema", None) and isinstance( route, routing.APIRoute ): if route.body_field: assert isinstance( route.body_field, Field ), "A request body must be a Pydantic Field" body_fields_from_routes.append(route.body_field) if route.response_field: responses_from_routes.append(route.response_field) flat_models = get_flat_models_from_fields( body_fields_from_routes + responses_from_routes ) return flat_models def get_model_definitions( *, flat_models: Set[Type[BaseModel]], model_name_map: Dict[Type[BaseModel], str] ) -> Dict[str, Any]: definitions: Dict[str, Dict] = {} for model in flat_models: m_schema, m_definitions = model_process_schema( model, model_name_map=model_name_map, ref_prefix=REF_PREFIX ) definitions.update(m_definitions) model_name = model_name_map[model] definitions[model_name] = m_schema return definitions def get_path_param_names(path: str) -> Set[str]: return {item.strip("{}") for item in re.findall("{[^}]*}", path)} PK ρM fastapi/dependencies/__init__.pyPK M.pQF F fastapi/dependencies/models.pyfrom typing import Any, Callable, Dict, List, Sequence, Tuple from fastapi.security.base import SecurityBase from pydantic import BaseConfig, Schema from pydantic.error_wrappers import ErrorWrapper from pydantic.errors import MissingError from pydantic.fields import Field, Required from pydantic.schema import get_annotation_from_schema from starlette.concurrency import run_in_threadpool from starlette.requests import Request param_supported_types = (str, int, float, bool) class SecurityRequirement: def __init__(self, security_scheme: SecurityBase, scopes: Sequence[str] = None): self.security_scheme = security_scheme self.scopes = scopes class Dependant: def __init__( self, *, path_params: List[Field] = None, query_params: List[Field] = None, header_params: List[Field] = None, cookie_params: List[Field] = None, body_params: List[Field] = None, dependencies: List["Dependant"] = None, security_schemes: List[SecurityRequirement] = None, name: str = None, call: Callable = None, request_param_name: str = None, ) -> None: self.path_params = path_params or [] self.query_params = query_params or [] self.header_params = header_params or [] self.cookie_params = cookie_params or [] self.body_params = body_params or [] self.dependencies = dependencies or [] self.security_requirements = security_schemes or [] self.request_param_name = request_param_name self.name = name self.call = call PK ݐM9r2 2 fastapi/dependencies/utils.pyimport asyncio import inspect from copy import deepcopy from typing import Any, Callable, Dict, List, Mapping, Sequence, Tuple, Type from fastapi import params from fastapi.dependencies.models import Dependant, SecurityRequirement from fastapi.security.base import SecurityBase from fastapi.utils import get_path_param_names from pydantic import BaseConfig, Schema, create_model from pydantic.error_wrappers import ErrorWrapper from pydantic.errors import MissingError from pydantic.fields import Field, Required from pydantic.schema import get_annotation_from_schema from pydantic.utils import lenient_issubclass from starlette.concurrency import run_in_threadpool from starlette.requests import Request param_supported_types = (str, int, float, bool) def get_sub_dependant(*, param: inspect.Parameter, path: str) -> Dependant: depends: params.Depends = param.default if depends.dependency: dependency = depends.dependency else: dependency = param.annotation sub_dependant = get_dependant(path=path, call=dependency, name=param.name) if isinstance(depends, params.Security) and isinstance(dependency, SecurityBase): security_requirement = SecurityRequirement( security_scheme=dependency, scopes=depends.scopes ) sub_dependant.security_requirements.append(security_requirement) return sub_dependant def get_flat_dependant(dependant: Dependant) -> Dependant: flat_dependant = Dependant( path_params=dependant.path_params.copy(), query_params=dependant.query_params.copy(), header_params=dependant.header_params.copy(), cookie_params=dependant.cookie_params.copy(), body_params=dependant.body_params.copy(), security_schemes=dependant.security_requirements.copy(), ) for sub_dependant in dependant.dependencies: if sub_dependant is dependant: raise ValueError("recursion", dependant.dependencies) flat_sub = get_flat_dependant(sub_dependant) flat_dependant.path_params.extend(flat_sub.path_params) flat_dependant.query_params.extend(flat_sub.query_params) flat_dependant.header_params.extend(flat_sub.header_params) flat_dependant.cookie_params.extend(flat_sub.cookie_params) flat_dependant.body_params.extend(flat_sub.body_params) flat_dependant.security_requirements.extend(flat_sub.security_requirements) return flat_dependant def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant: path_param_names = get_path_param_names(path) endpoint_signature = inspect.signature(call) signature_params = endpoint_signature.parameters dependant = Dependant(call=call, name=name) for param_name in signature_params: param = signature_params[param_name] if isinstance(param.default, params.Depends): sub_dependant = get_sub_dependant(param=param, path=path) dependant.dependencies.append(sub_dependant) for param_name in signature_params: param = signature_params[param_name] if ( (param.default == param.empty) or isinstance(param.default, params.Path) ) and (param_name in path_param_names): assert ( lenient_issubclass(param.annotation, param_supported_types) or param.annotation == param.empty ), f"Path params must be of type str, int, float or boot: {param}" param = signature_params[param_name] add_param_to_fields( param=param, dependant=dependant, default_schema=params.Path, force_type=params.ParamTypes.path, ) elif ( param.default == param.empty or param.default is None or type(param.default) in param_supported_types ) and ( param.annotation == param.empty or lenient_issubclass(param.annotation, param_supported_types) ): add_param_to_fields( param=param, dependant=dependant, default_schema=params.Query ) elif isinstance(param.default, params.Param): if param.annotation != param.empty: assert lenient_issubclass( param.annotation, param_supported_types ), f"Parameters for Path, Query, Header and Cookies must be of type str, int, float or bool: {param}" add_param_to_fields( param=param, dependant=dependant, default_schema=params.Query ) elif lenient_issubclass(param.annotation, Request): dependant.request_param_name = param_name elif not isinstance(param.default, params.Depends): add_param_to_body_fields(param=param, dependant=dependant) return dependant def add_param_to_fields( *, param: inspect.Parameter, dependant: Dependant, default_schema: Type[Schema] = params.Param, force_type: params.ParamTypes = None, ) -> None: default_value = Required if not param.default == param.empty: default_value = param.default if isinstance(default_value, params.Param): schema = default_value default_value = schema.default if getattr(schema, "in_", None) is None: schema.in_ = default_schema.in_ if force_type: schema.in_ = force_type else: schema = default_schema(default_value) required = default_value == Required annotation: Any = Any if not param.annotation == param.empty: annotation = param.annotation annotation = get_annotation_from_schema(annotation, schema) if not schema.alias and getattr(schema, "convert_underscores", None): alias = param.name.replace("_", "-") else: alias = schema.alias or param.name field = Field( name=param.name, type_=annotation, default=None if required else default_value, alias=alias, required=required, model_config=BaseConfig(), class_validators=[], schema=schema, ) if schema.in_ == params.ParamTypes.path: dependant.path_params.append(field) elif schema.in_ == params.ParamTypes.query: dependant.query_params.append(field) elif schema.in_ == params.ParamTypes.header: dependant.header_params.append(field) else: assert ( schema.in_ == params.ParamTypes.cookie ), f"non-body parameters must be in path, query, header or cookie: {param.name}" dependant.cookie_params.append(field) def add_param_to_body_fields(*, param: inspect.Parameter, dependant: Dependant) -> None: default_value = Required if not param.default == param.empty: default_value = param.default if isinstance(default_value, Schema): schema = default_value default_value = schema.default else: schema = Schema(default_value) required = default_value == Required annotation = get_annotation_from_schema(param.annotation, schema) field = Field( name=param.name, type_=annotation, default=None if required else default_value, alias=schema.alias or param.name, required=required, model_config=BaseConfig, class_validators=[], schema=schema, ) dependant.body_params.append(field) def is_coroutine_callable(call: Callable = None) -> bool: if not call: return False if inspect.isfunction(call): return asyncio.iscoroutinefunction(call) if inspect.isclass(call): return False call = getattr(call, "__call__", None) if not call: return False return asyncio.iscoroutinefunction(call) async def solve_dependencies( *, request: Request, dependant: Dependant, body: Dict[str, Any] = None ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]: values: Dict[str, Any] = {} errors: List[ErrorWrapper] = [] for sub_dependant in dependant.dependencies: sub_values, sub_errors = await solve_dependencies( request=request, dependant=sub_dependant, body=body ) if sub_errors: return {}, errors assert sub_dependant.call is not None, "sub_dependant.call must be a function" if is_coroutine_callable(sub_dependant.call): solved = await sub_dependant.call(**sub_values) else: solved = await run_in_threadpool(sub_dependant.call, **sub_values) assert sub_dependant.name is not None, "Subdependants always have a name" values[sub_dependant.name] = solved path_values, path_errors = request_params_to_args( dependant.path_params, request.path_params ) query_values, query_errors = request_params_to_args( dependant.query_params, request.query_params ) header_values, header_errors = request_params_to_args( dependant.header_params, request.headers ) cookie_values, cookie_errors = request_params_to_args( dependant.cookie_params, request.cookies ) values.update(path_values) values.update(query_values) values.update(header_values) values.update(cookie_values) errors = path_errors + query_errors + header_errors + cookie_errors if dependant.body_params: body_values, body_errors = await request_body_to_args( # type: ignore # body_params checked above dependant.body_params, body ) values.update(body_values) errors.extend(body_errors) if dependant.request_param_name: values[dependant.request_param_name] = request return values, errors def request_params_to_args( required_params: Sequence[Field], received_params: Mapping[str, Any] ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]: values = {} errors = [] for field in required_params: value = received_params.get(field.alias) if value is None: if field.required: errors.append( ErrorWrapper(MissingError(), loc=field.alias, config=BaseConfig) ) else: values[field.name] = deepcopy(field.default) continue schema: params.Param = field.schema assert isinstance(schema, params.Param), "Params must be subclasses of Param" v_, errors_ = field.validate(value, values, loc=(schema.in_.value, field.alias)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) else: values[field.name] = v_ return values, errors async def request_body_to_args( required_params: List[Field], received_body: Dict[str, Any] ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]: values = {} errors = [] if required_params: field = required_params[0] embed = getattr(field.schema, "embed", None) if len(required_params) == 1 and not embed: received_body = {field.alias: received_body} for field in required_params: value = received_body.get(field.alias) if value is None: if field.required: errors.append( ErrorWrapper( MissingError(), loc=("body", field.alias), config=BaseConfig ) ) else: values[field.name] = deepcopy(field.default) continue v_, errors_ = field.validate(value, values, loc=("body", field.alias)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) else: values[field.name] = v_ return values, errors def get_body_field(*, dependant: Dependant, name: str) -> Field: flat_dependant = get_flat_dependant(dependant) if not flat_dependant.body_params: return None first_param = flat_dependant.body_params[0] embed = getattr(first_param.schema, "embed", None) if len(flat_dependant.body_params) == 1 and not embed: return first_param model_name = "Body_" + name BodyModel = create_model(model_name) for f in flat_dependant.body_params: BodyModel.__fields__[f.name] = f required = any(True for f in flat_dependant.body_params if f.required) if any(isinstance(f.schema, params.File) for f in flat_dependant.body_params): BodySchema: Type[params.Body] = params.File elif any(isinstance(f.schema, params.Form) for f in flat_dependant.body_params): BodySchema = params.Form else: BodySchema = params.Body field = Field( name="body", type_=BodyModel, default=None, required=required, model_config=BaseConfig, class_validators=[], alias="body", schema=BodySchema(None), ) return field PK YM fastapi/openapi/__init__.pyPK YMma a fastapi/openapi/constants.pyMETHODS_WITH_BODY = set(("POST", "PUT", "DELETE", "PATCH")) REF_PREFIX = "#/components/schemas/" PK ɕMGO fastapi/openapi/docs.pyfrom starlette.responses import HTMLResponse def get_swagger_ui_html(*, openapi_url: str, title: str) -> HTMLResponse: return HTMLResponse( """