PKj~僋񻜸fastapi/__init__.py"""FastAPI framework, high performance, easy to learn, fast to code, ready for production""" __version__ = "0.11.0" from starlette.background import BackgroundTasks from .applications import FastAPI from .datastructures import UploadFile from .exceptions import HTTPException from .params import Body, Cookie, Depends, File, Form, Header, Path, Query, Security from .routing import APIRouter PK}晆N材蜫U7U7fastapi/applications.pyfrom typing import Any, Callable, Dict, List, Optional, Type from fastapi import routing from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.openapi.utils import get_openapi from pydantic import BaseModel from starlette.applications import Starlette from starlette.exceptions import ExceptionMiddleware, HTTPException from starlette.middleware.errors import ServerErrorMiddleware from starlette.requests import Request from starlette.responses import JSONResponse, Response from starlette.routing import BaseRoute async def http_exception(request: Request, exc: HTTPException) -> JSONResponse: headers = getattr(exc, "headers", None) if headers: return JSONResponse( {"detail": exc.detail}, status_code=exc.status_code, headers=headers ) else: return JSONResponse({"detail": exc.detail}, status_code=exc.status_code) class FastAPI(Starlette): def __init__( self, debug: bool = False, routes: List[BaseRoute] = None, template_directory: str = None, title: str = "Fast API", description: str = "", version: str = "0.1.0", openapi_url: Optional[str] = "/openapi.json", openapi_prefix: str = "", docs_url: Optional[str] = "/docs", redoc_url: Optional[str] = "/redoc", **extra: Dict[str, Any], ) -> None: self._debug = debug self.router: routing.APIRouter = routing.APIRouter(routes) self.exception_middleware = ExceptionMiddleware(self.router, debug=debug) self.error_middleware = ServerErrorMiddleware( self.exception_middleware, debug=debug ) self.title = title self.description = description self.version = version self.openapi_url = openapi_url self.openapi_prefix = openapi_prefix.rstrip("/") self.docs_url = docs_url self.redoc_url = redoc_url self.extra = extra self.openapi_version = "3.0.2" if self.openapi_url: assert self.title, "A title must be provided for OpenAPI, e.g.: 'My API'" assert self.version, "A version must be provided for OpenAPI, e.g.: '2.1.0'" if self.docs_url or self.redoc_url: assert self.openapi_url, "The openapi_url is required for the docs" self.openapi_schema: Optional[Dict[str, Any]] = None self.setup() def openapi(self) -> Dict: if not self.openapi_schema: self.openapi_schema = get_openapi( title=self.title, version=self.version, openapi_version=self.openapi_version, description=self.description, routes=self.routes, openapi_prefix=self.openapi_prefix, ) return self.openapi_schema def setup(self) -> None: if self.openapi_url: self.add_route( self.openapi_url, lambda req: JSONResponse(self.openapi()), include_in_schema=False, ) if self.openapi_url and self.docs_url: self.add_route( self.docs_url, lambda r: get_swagger_ui_html( openapi_url=self.openapi_prefix + self.openapi_url, title=self.title + " - Swagger UI", ), include_in_schema=False, ) if self.openapi_url and self.redoc_url: self.add_route( self.redoc_url, lambda r: get_redoc_html( openapi_url=self.openapi_prefix + self.openapi_url, title=self.title + " - ReDoc", ), include_in_schema=False, ) self.add_exception_handler(HTTPException, http_exception) def add_api_route( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> None: self.router.add_api_route( path, endpoint=endpoint, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def api_route( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: def decorator(func: Callable) -> Callable: self.router.add_api_route( path, func, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) return func return decorator def include_router( self, router: routing.APIRouter, *, prefix: str = "", tags: List[str] = None ) -> None: self.router.include_router(router, prefix=prefix, tags=tags) def get( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.get( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def put( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.put( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def post( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.post( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def delete( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.delete( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def options( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.options( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def head( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.head( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def patch( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.patch( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def trace( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.router.trace( path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) PK暕pN件銆fastapi/datastructures.pyfrom typing import Any, Callable, Iterable, Type from starlette.datastructures import UploadFile as StarletteUploadFile class UploadFile(StarletteUploadFile): @classmethod def __get_validators__(cls: Type["UploadFile"]) -> Iterable[Callable]: yield cls.validate @classmethod def validate(cls: Type["UploadFile"], v: Any) -> Any: if not isinstance(v, StarletteUploadFile): raise ValueError(f"Expected UploadFile, received: {type(v)}") return v PKWNO悩 fastapi/encoders.pyfrom enum import Enum from types import GeneratorType from typing import Any, List, Set from pydantic import BaseModel from pydantic.json import ENCODERS_BY_TYPE def jsonable_encoder( obj: Any, include: Set[str] = None, exclude: Set[str] = set(), by_alias: bool = False, include_none: bool = True, custom_encoder: dict = {}, sqlalchemy_safe: bool = True, ) -> Any: if isinstance(obj, BaseModel): encoder = getattr(obj.Config, "json_encoders", custom_encoder) return jsonable_encoder( obj.dict(include=include, exclude=exclude, by_alias=by_alias), include_none=include_none, custom_encoder=encoder, sqlalchemy_safe=sqlalchemy_safe, ) if isinstance(obj, Enum): return obj.value if isinstance(obj, (str, int, float, type(None))): return obj if isinstance(obj, dict): encoded_dict = {} for key, value in obj.items(): if ( ( not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa")) ) and (value is not None or include_none) and ((include and key in include) or key not in exclude) ): encoded_key = jsonable_encoder( key, by_alias=by_alias, include_none=include_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) encoded_value = jsonable_encoder( value, by_alias=by_alias, include_none=include_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) encoded_dict[encoded_key] = encoded_value return encoded_dict if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)): encoded_list = [] for item in obj: encoded_list.append( jsonable_encoder( item, include=include, exclude=exclude, by_alias=by_alias, include_none=include_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) ) return encoded_list errors: List[Exception] = [] try: if custom_encoder and type(obj) in custom_encoder: encoder = custom_encoder[type(obj)] else: encoder = ENCODERS_BY_TYPE[type(obj)] return encoder(obj) except KeyError as e: errors.append(e) try: data = dict(obj) except Exception as e: errors.append(e) try: data = vars(obj) except Exception as e: errors.append(e) raise ValueError(errors) return jsonable_encoder( data, by_alias=by_alias, include_none=include_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) PKhYTN患'魾Bfastapi/exceptions.pyfrom starlette.exceptions import HTTPException as StarletteHTTPException class HTTPException(StarletteHTTPException): def __init__( self, status_code: int, detail: str = None, headers: dict = None ) -> None: super().__init__(status_code=status_code, detail=detail) self.headers = headers PKhYTNVP扖fastapi/params.pyfrom enum import Enum from typing import Any, Callable, Sequence from pydantic import Schema class ParamTypes(Enum): query = "query" header = "header" path = "path" cookie = "cookie" class Param(Schema): in_: ParamTypes def __init__( self, default: Any, *, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, deprecated: bool = None, **extra: Any, ): self.deprecated = deprecated super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Path(Param): in_ = ParamTypes.path def __init__( self, default: Any, *, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, deprecated: bool = None, **extra: Any, ): self.in_ = self.in_ super().__init__( ..., alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, deprecated=deprecated, **extra, ) class Query(Param): in_ = ParamTypes.query def __init__( self, default: Any, *, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, deprecated: bool = None, **extra: Any, ): super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, deprecated=deprecated, **extra, ) class Header(Param): in_ = ParamTypes.header def __init__( self, default: Any, *, alias: str = None, convert_underscores: bool = True, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, deprecated: bool = None, **extra: Any, ): self.convert_underscores = convert_underscores super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, deprecated=deprecated, **extra, ) class Cookie(Param): in_ = ParamTypes.cookie def __init__( self, default: Any, *, alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, deprecated: bool = None, **extra: Any, ): super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, deprecated=deprecated, **extra, ) class Body(Schema): def __init__( self, default: Any, *, embed: bool = False, media_type: str = "application/json", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): self.embed = embed self.media_type = media_type super().__init__( default, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Form(Body): def __init__( self, default: Any, *, media_type: str = "application/x-www-form-urlencoded", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): super().__init__( default, embed=True, media_type=media_type, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class File(Form): def __init__( self, default: Any, *, media_type: str = "multipart/form-data", alias: str = None, title: str = None, description: str = None, gt: float = None, ge: float = None, lt: float = None, le: float = None, min_length: int = None, max_length: int = None, regex: str = None, **extra: Any, ): super().__init__( default, media_type=media_type, alias=alias, title=title, description=description, gt=gt, ge=ge, lt=lt, le=le, min_length=min_length, max_length=max_length, regex=regex, **extra, ) class Depends: def __init__(self, dependency: Callable = None): self.dependency = dependency class Security(Depends): def __init__(self, dependency: Callable = None, scopes: Sequence[str] = None): self.scopes = scopes or [] super().__init__(dependency=dependency) PKWN 焻H匟fastapi/routing.pyimport asyncio import inspect import logging from typing import Any, Callable, List, Optional, Type from fastapi import params from fastapi.dependencies.models import Dependant from fastapi.dependencies.utils import get_body_field, get_dependant, solve_dependencies from fastapi.encoders import jsonable_encoder from fastapi.utils import UnconstrainedConfig from pydantic import BaseModel, Schema from pydantic.error_wrappers import ErrorWrapper, ValidationError from pydantic.fields import Field from pydantic.utils import lenient_issubclass from starlette import routing from starlette.concurrency import run_in_threadpool from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse, Response from starlette.routing import compile_path, get_name, request_response from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY def serialize_response(*, field: Field = None, response: Response) -> Any: encoded = jsonable_encoder(response) if field: errors = [] value, errors_ = field.validate(encoded, {}, loc=("response",)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) if errors: raise ValidationError(errors) return jsonable_encoder(value) else: return encoded def get_app( dependant: Dependant, body_field: Field = None, status_code: int = 200, content_type: Type[Response] = JSONResponse, response_field: Field = None, ) -> Callable: assert dependant.call is not None, "dependant.call must me a function" is_coroutine = asyncio.iscoroutinefunction(dependant.call) is_body_form = body_field and isinstance(body_field.schema, params.Form) async def app(request: Request) -> Response: try: body = None if body_field: if is_body_form: raw_body = await request.form() form_fields = {} for field, value in raw_body.items(): form_fields[field] = value if form_fields: body = form_fields else: body_bytes = await request.body() if body_bytes: body = await request.json() except Exception as e: logging.error("Error getting request body", e) raise HTTPException( status_code=400, detail="There was an error parsing the body" ) values, errors, background_tasks = await solve_dependencies( request=request, dependant=dependant, body=body ) if errors: errors_out = ValidationError(errors) raise HTTPException( status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail=errors_out.errors() ) else: assert dependant.call is not None, "dependant.call must me a function" if is_coroutine: raw_response = await dependant.call(**values) else: raw_response = await run_in_threadpool(dependant.call, **values) if isinstance(raw_response, Response): if raw_response.background is None: raw_response.background = background_tasks return raw_response response_data = serialize_response( field=response_field, response=raw_response ) return content_type( content=response_data, status_code=status_code, background=background_tasks, ) return app class APIRoute(routing.Route): def __init__( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, name: str = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, ) -> None: assert path.startswith("/"), "Routed paths must always start with '/'" self.path = path self.endpoint = endpoint self.name = get_name(endpoint) if name is None else name self.response_model = response_model if self.response_model: assert lenient_issubclass( content_type, JSONResponse ), "To declare a type the response must be a JSON response" response_name = "Response_" + self.name self.response_field: Optional[Field] = Field( name=response_name, type_=self.response_model, class_validators=[], default=None, required=False, model_config=UnconstrainedConfig, schema=Schema(None), ) else: self.response_field = None self.status_code = status_code self.tags = tags or [] self.summary = summary self.description = description or self.endpoint.__doc__ self.response_description = response_description self.deprecated = deprecated if methods is None: methods = ["GET"] self.methods = methods self.operation_id = operation_id self.include_in_schema = include_in_schema self.content_type = content_type self.path_regex, self.path_format, self.param_convertors = compile_path(path) assert inspect.isfunction(endpoint) or inspect.ismethod( endpoint ), f"An endpoint must be a function or method" self.dependant = get_dependant(path=path, call=self.endpoint) self.body_field = get_body_field(dependant=self.dependant, name=self.name) self.app = request_response( get_app( dependant=self.dependant, body_field=self.body_field, status_code=self.status_code, content_type=self.content_type, response_field=self.response_field, ) ) class APIRouter(routing.Router): def add_api_route( self, path: str, endpoint: Callable, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> None: route = APIRoute( path, endpoint=endpoint, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) self.routes.append(route) def api_route( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, methods: List[str] = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: def decorator(func: Callable) -> Callable: self.add_api_route( path, func, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=methods, operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) return func return decorator def include_router( self, router: "APIRouter", *, prefix: str = "", tags: List[str] = None ) -> None: if prefix: assert prefix.startswith("/"), "A path prefix must start with '/'" assert not prefix.endswith( "/" ), "A path prefix must not end with '/', as the routes will start with '/'" for route in router.routes: if isinstance(route, APIRoute): self.add_api_route( prefix + route.path, route.endpoint, response_model=route.response_model, status_code=route.status_code, tags=(route.tags or []) + (tags or []), summary=route.summary, description=route.description, response_description=route.response_description, deprecated=route.deprecated, methods=route.methods, operation_id=route.operation_id, include_in_schema=route.include_in_schema, content_type=route.content_type, name=route.name, ) elif isinstance(route, routing.Route): self.add_route( prefix + route.path, route.endpoint, methods=route.methods, include_in_schema=route.include_in_schema, name=route.name, ) elif isinstance(route, routing.WebSocketRoute): self.add_websocket_route( prefix + route.path, route.endpoint, name=route.name ) def get( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["GET"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def put( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["PUT"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def post( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["POST"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def delete( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["DELETE"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def options( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["OPTIONS"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def head( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["HEAD"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def patch( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["PATCH"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) def trace( self, path: str, *, response_model: Type[BaseModel] = None, status_code: int = 200, tags: List[str] = None, summary: str = None, description: str = None, response_description: str = "Successful Response", deprecated: bool = None, operation_id: str = None, include_in_schema: bool = True, content_type: Type[Response] = JSONResponse, name: str = None, ) -> Callable: return self.api_route( path=path, response_model=response_model, status_code=status_code, tags=tags or [], summary=summary, description=description, response_description=response_description, deprecated=deprecated, methods=["TRACE"], operation_id=operation_id, include_in_schema=include_in_schema, content_type=content_type, name=name, ) PK踭iNlcs闌@fastapi/utils.pyimport re from typing import Any, Dict, List, Sequence, Set, Type from fastapi import routing from fastapi.openapi.constants import REF_PREFIX from pydantic import BaseConfig, BaseModel from pydantic.fields import Field from pydantic.schema import get_flat_models_from_fields, model_process_schema from starlette.routing import BaseRoute class UnconstrainedConfig(BaseConfig): min_anystr_length = None max_anystr_length = None def get_flat_models_from_routes( routes: Sequence[Type[BaseRoute]] ) -> Set[Type[BaseModel]]: body_fields_from_routes: List[Field] = [] responses_from_routes: List[Field] = [] for route in routes: if getattr(route, "include_in_schema", None) and isinstance( route, routing.APIRoute ): if route.body_field: assert isinstance( route.body_field, Field ), "A request body must be a Pydantic Field" body_fields_from_routes.append(route.body_field) if route.response_field: responses_from_routes.append(route.response_field) flat_models = get_flat_models_from_fields( body_fields_from_routes + responses_from_routes ) return flat_models def get_model_definitions( *, flat_models: Set[Type[BaseModel]], model_name_map: Dict[Type[BaseModel], str] ) -> Dict[str, Any]: definitions: Dict[str, Dict] = {} for model in flat_models: m_schema, m_definitions = model_process_schema( model, model_name_map=model_name_map, ref_prefix=REF_PREFIX ) definitions.update(m_definitions) model_name = model_name_map[model] definitions[model_name] = m_schema return definitions def get_path_param_names(path: str) -> Set[str]: return {item.strip("{}") for item in re.findall("{[^}]*}", path)} PKhYTN fastapi/dependencies/__init__.pyPKWND浙Afastapi/dependencies/models.pyfrom typing import Callable, List, Sequence from fastapi.security.base import SecurityBase from pydantic.fields import Field param_supported_types = (str, int, float, bool) class SecurityRequirement: def __init__(self, security_scheme: SecurityBase, scopes: Sequence[str] = None): self.security_scheme = security_scheme self.scopes = scopes class Dependant: def __init__( self, *, path_params: List[Field] = None, query_params: List[Field] = None, header_params: List[Field] = None, cookie_params: List[Field] = None, body_params: List[Field] = None, dependencies: List["Dependant"] = None, security_schemes: List[SecurityRequirement] = None, name: str = None, call: Callable = None, request_param_name: str = None, background_tasks_param_name: str = None, ) -> None: self.path_params = path_params or [] self.query_params = query_params or [] self.header_params = header_params or [] self.cookie_params = cookie_params or [] self.body_params = body_params or [] self.dependencies = dependencies or [] self.security_requirements = security_schemes or [] self.request_param_name = request_param_name self.background_tasks_param_name = background_tasks_param_name self.name = name self.call = call PKWNb邀5=5=fastapi/dependencies/utils.pyimport asyncio import inspect from copy import deepcopy from datetime import date, datetime, time, timedelta from decimal import Decimal from typing import ( Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Type, Union, ) from uuid import UUID from fastapi import params from fastapi.dependencies.models import Dependant, SecurityRequirement from fastapi.security.base import SecurityBase from fastapi.utils import UnconstrainedConfig, get_path_param_names from pydantic import Schema, create_model from pydantic.error_wrappers import ErrorWrapper from pydantic.errors import MissingError from pydantic.fields import Field, Required, Shape from pydantic.schema import get_annotation_from_schema from pydantic.utils import lenient_issubclass from starlette.background import BackgroundTasks from starlette.concurrency import run_in_threadpool from starlette.datastructures import UploadFile from starlette.requests import Headers, QueryParams, Request param_supported_types = ( str, int, float, bool, UUID, date, datetime, time, timedelta, Decimal, ) def get_sub_dependant(*, param: inspect.Parameter, path: str) -> Dependant: depends: params.Depends = param.default if depends.dependency: dependency = depends.dependency else: dependency = param.annotation sub_dependant = get_dependant(path=path, call=dependency, name=param.name) if isinstance(depends, params.Security) and isinstance(dependency, SecurityBase): security_requirement = SecurityRequirement( security_scheme=dependency, scopes=depends.scopes ) sub_dependant.security_requirements.append(security_requirement) return sub_dependant def get_flat_dependant(dependant: Dependant) -> Dependant: flat_dependant = Dependant( path_params=dependant.path_params.copy(), query_params=dependant.query_params.copy(), header_params=dependant.header_params.copy(), cookie_params=dependant.cookie_params.copy(), body_params=dependant.body_params.copy(), security_schemes=dependant.security_requirements.copy(), ) for sub_dependant in dependant.dependencies: flat_sub = get_flat_dependant(sub_dependant) flat_dependant.path_params.extend(flat_sub.path_params) flat_dependant.query_params.extend(flat_sub.query_params) flat_dependant.header_params.extend(flat_sub.header_params) flat_dependant.cookie_params.extend(flat_sub.cookie_params) flat_dependant.body_params.extend(flat_sub.body_params) flat_dependant.security_requirements.extend(flat_sub.security_requirements) return flat_dependant def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant: path_param_names = get_path_param_names(path) endpoint_signature = inspect.signature(call) signature_params = endpoint_signature.parameters dependant = Dependant(call=call, name=name) for param_name in signature_params: param = signature_params[param_name] if isinstance(param.default, params.Depends): sub_dependant = get_sub_dependant(param=param, path=path) dependant.dependencies.append(sub_dependant) for param_name in signature_params: param = signature_params[param_name] if ( (param.default == param.empty) or isinstance(param.default, params.Path) ) and (param_name in path_param_names): assert ( lenient_issubclass(param.annotation, param_supported_types) or param.annotation == param.empty ), f"Path params must be of one of the supported types" add_param_to_fields( param=param, dependant=dependant, default_schema=params.Path, force_type=params.ParamTypes.path, ) elif ( param.default == param.empty or param.default is None or isinstance(param.default, param_supported_types) ) and ( param.annotation == param.empty or lenient_issubclass(param.annotation, param_supported_types) ): add_param_to_fields( param=param, dependant=dependant, default_schema=params.Query ) elif isinstance(param.default, params.Param): if param.annotation != param.empty: origin = getattr(param.annotation, "__origin__", None) param_all_types = param_supported_types + (list, tuple, set) if isinstance(param.default, (params.Query, params.Header)): assert lenient_issubclass( param.annotation, param_all_types ) or lenient_issubclass( origin, param_all_types ), f"Parameters for Query and Header must be of type str, int, float, bool, list, tuple or set: {param}" else: assert lenient_issubclass( param.annotation, param_supported_types ), f"Parameters for Path and Cookies must be of type str, int, float, bool: {param}" add_param_to_fields( param=param, dependant=dependant, default_schema=params.Query ) elif lenient_issubclass(param.annotation, Request): dependant.request_param_name = param_name elif lenient_issubclass(param.annotation, BackgroundTasks): dependant.background_tasks_param_name = param_name elif not isinstance(param.default, params.Depends): add_param_to_body_fields(param=param, dependant=dependant) return dependant def add_param_to_fields( *, param: inspect.Parameter, dependant: Dependant, default_schema: Type[Schema] = params.Param, force_type: params.ParamTypes = None, ) -> None: default_value = Required if not param.default == param.empty: default_value = param.default if isinstance(default_value, params.Param): schema = default_value default_value = schema.default if getattr(schema, "in_", None) is None: schema.in_ = default_schema.in_ if force_type: schema.in_ = force_type else: schema = default_schema(default_value) required = default_value == Required annotation: Any = Any if not param.annotation == param.empty: annotation = param.annotation annotation = get_annotation_from_schema(annotation, schema) if not schema.alias and getattr(schema, "convert_underscores", None): alias = param.name.replace("_", "-") else: alias = schema.alias or param.name field = Field( name=param.name, type_=annotation, default=None if required else default_value, alias=alias, required=required, model_config=UnconstrainedConfig, class_validators=[], schema=schema, ) if schema.in_ == params.ParamTypes.path: dependant.path_params.append(field) elif schema.in_ == params.ParamTypes.query: dependant.query_params.append(field) elif schema.in_ == params.ParamTypes.header: dependant.header_params.append(field) else: assert ( schema.in_ == params.ParamTypes.cookie ), f"non-body parameters must be in path, query, header or cookie: {param.name}" dependant.cookie_params.append(field) def add_param_to_body_fields(*, param: inspect.Parameter, dependant: Dependant) -> None: default_value = Required if not param.default == param.empty: default_value = param.default if isinstance(default_value, Schema): schema = default_value default_value = schema.default else: schema = Schema(default_value) required = default_value == Required annotation = get_annotation_from_schema(param.annotation, schema) field = Field( name=param.name, type_=annotation, default=None if required else default_value, alias=schema.alias or param.name, required=required, model_config=UnconstrainedConfig, class_validators=[], schema=schema, ) dependant.body_params.append(field) def is_coroutine_callable(call: Callable) -> bool: if inspect.isfunction(call): return asyncio.iscoroutinefunction(call) if inspect.isclass(call): return False call = getattr(call, "__call__", None) return asyncio.iscoroutinefunction(call) async def solve_dependencies( *, request: Request, dependant: Dependant, body: Dict[str, Any] = None, background_tasks: BackgroundTasks = None, ) -> Tuple[Dict[str, Any], List[ErrorWrapper], Optional[BackgroundTasks]]: values: Dict[str, Any] = {} errors: List[ErrorWrapper] = [] for sub_dependant in dependant.dependencies: sub_values, sub_errors, background_tasks = await solve_dependencies( request=request, dependant=sub_dependant, body=body, background_tasks=background_tasks, ) if sub_errors: errors.extend(sub_errors) continue assert sub_dependant.call is not None, "sub_dependant.call must be a function" if is_coroutine_callable(sub_dependant.call): solved = await sub_dependant.call(**sub_values) else: solved = await run_in_threadpool(sub_dependant.call, **sub_values) assert sub_dependant.name is not None, "Subdependants always have a name" values[sub_dependant.name] = solved path_values, path_errors = request_params_to_args( dependant.path_params, request.path_params ) query_values, query_errors = request_params_to_args( dependant.query_params, request.query_params ) header_values, header_errors = request_params_to_args( dependant.header_params, request.headers ) cookie_values, cookie_errors = request_params_to_args( dependant.cookie_params, request.cookies ) values.update(path_values) values.update(query_values) values.update(header_values) values.update(cookie_values) errors += path_errors + query_errors + header_errors + cookie_errors if dependant.body_params: body_values, body_errors = await request_body_to_args( # type: ignore # body_params checked above dependant.body_params, body ) values.update(body_values) errors.extend(body_errors) if dependant.request_param_name: values[dependant.request_param_name] = request if dependant.background_tasks_param_name: if background_tasks is None: background_tasks = BackgroundTasks() values[dependant.background_tasks_param_name] = background_tasks return values, errors, background_tasks def request_params_to_args( required_params: Sequence[Field], received_params: Union[Mapping[str, Any], QueryParams, Headers], ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]: values = {} errors = [] for field in required_params: if field.shape in {Shape.LIST, Shape.SET, Shape.TUPLE} and isinstance( received_params, (QueryParams, Headers) ): value = received_params.getlist(field.alias) else: value = received_params.get(field.alias) schema: params.Param = field.schema assert isinstance(schema, params.Param), "Params must be subclasses of Param" if value is None: if field.required: errors.append( ErrorWrapper( MissingError(), loc=(schema.in_.value, field.alias), config=UnconstrainedConfig, ) ) else: values[field.name] = deepcopy(field.default) continue v_, errors_ = field.validate(value, values, loc=(schema.in_.value, field.alias)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) else: values[field.name] = v_ return values, errors async def request_body_to_args( required_params: List[Field], received_body: Dict[str, Any] ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]: values = {} errors = [] if required_params: field = required_params[0] embed = getattr(field.schema, "embed", None) if len(required_params) == 1 and not embed: received_body = {field.alias: received_body} elif received_body is None: received_body = {} for field in required_params: value = received_body.get(field.alias) if value is None or (isinstance(field.schema, params.Form) and value == ""): if field.required: errors.append( ErrorWrapper( MissingError(), loc=("body", field.alias), config=UnconstrainedConfig, ) ) else: values[field.name] = deepcopy(field.default) continue if ( isinstance(field.schema, params.File) and lenient_issubclass(field.type_, bytes) and isinstance(value, UploadFile) ): value = await value.read() v_, errors_ = field.validate(value, values, loc=("body", field.alias)) if isinstance(errors_, ErrorWrapper): errors.append(errors_) elif isinstance(errors_, list): errors.extend(errors_) else: values[field.name] = v_ return values, errors def get_schema_compatible_field(*, field: Field) -> Field: if lenient_issubclass(field.type_, UploadFile): return Field( name=field.name, type_=bytes, class_validators=field.class_validators, model_config=field.model_config, default=field.default, required=field.required, alias=field.alias, schema=field.schema, ) return field def get_body_field(*, dependant: Dependant, name: str) -> Field: flat_dependant = get_flat_dependant(dependant) if not flat_dependant.body_params: return None first_param = flat_dependant.body_params[0] embed = getattr(first_param.schema, "embed", None) if len(flat_dependant.body_params) == 1 and not embed: return get_schema_compatible_field(field=first_param) model_name = "Body_" + name BodyModel = create_model(model_name) for f in flat_dependant.body_params: BodyModel.__fields__[f.name] = get_schema_compatible_field(field=f) required = any(True for f in flat_dependant.body_params if f.required) if any(isinstance(f.schema, params.File) for f in flat_dependant.body_params): BodySchema: Type[params.Body] = params.File elif any(isinstance(f.schema, params.Form) for f in flat_dependant.body_params): BodySchema = params.Form else: BodySchema = params.Body field = Field( name="body", type_=BodyModel, default=None, required=required, model_config=UnconstrainedConfig, class_validators=[], alias="body", schema=BodySchema(None), ) return field PKhYTNfastapi/openapi/__init__.pyPKhYTN弿maafastapi/openapi/constants.pyMETHODS_WITH_BODY = set(("POST", "PUT", "DELETE", "PATCH")) REF_PREFIX = "#/components/schemas/" PK XNoJ<fastapi/openapi/docs.pyfrom starlette.responses import HTMLResponse def get_swagger_ui_html(*, openapi_url: str, title: str) -> HTMLResponse: return HTMLResponse( """ """ + title + """
""" ) def get_redoc_html(*, openapi_url: str, title: str) -> HTMLResponse: return HTMLResponse( """ """ + title + """ """ ) PKWN$iQ))fastapi/openapi/models.pyimport logging from enum import Enum from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel, Schema as PSchema from pydantic.types import UrlStr try: import email_validator assert email_validator # make autoflake ignore the unused import from pydantic.types import EmailStr # type: ignore except ImportError: # pragma: no cover logging.warning( "email-validator not installed, email fields will be treated as str.\n" + "To install, run: pip install email-validator" ) class EmailStr(str): # type: ignore pass class Contact(BaseModel): name: Optional[str] = None url: Optional[UrlStr] = None email: Optional[EmailStr] = None class License(BaseModel): name: str url: Optional[UrlStr] = None class Info(BaseModel): title: str description: Optional[str] = None termsOfService: Optional[str] = None contact: Optional[Contact] = None license: Optional[License] = None version: str class ServerVariable(BaseModel): enum: Optional[List[str]] = None default: str description: Optional[str] = None class Server(BaseModel): url: UrlStr description: Optional[str] = None variables: Optional[Dict[str, ServerVariable]] = None class Reference(BaseModel): ref: str = PSchema(..., alias="$ref") # type: ignore class Discriminator(BaseModel): propertyName: str mapping: Optional[Dict[str, str]] = None class XML(BaseModel): name: Optional[str] = None namespace: Optional[str] = None prefix: Optional[str] = None attribute: Optional[bool] = None wrapped: Optional[bool] = None class ExternalDocumentation(BaseModel): description: Optional[str] = None url: UrlStr class SchemaBase(BaseModel): ref: Optional[str] = PSchema(None, alias="$ref") # type: ignore title: Optional[str] = None multipleOf: Optional[float] = None maximum: Optional[float] = None exclusiveMaximum: Optional[float] = None minimum: Optional[float] = None exclusiveMinimum: Optional[float] = None maxLength: Optional[int] = PSchema(None, gte=0) # type: ignore minLength: Optional[int] = PSchema(None, gte=0) # type: ignore pattern: Optional[str] = None maxItems: Optional[int] = PSchema(None, gte=0) # type: ignore minItems: Optional[int] = PSchema(None, gte=0) # type: ignore uniqueItems: Optional[bool] = None maxProperties: Optional[int] = PSchema(None, gte=0) # type: ignore minProperties: Optional[int] = PSchema(None, gte=0) # type: ignore required: Optional[List[str]] = None enum: Optional[List[str]] = None type: Optional[str] = None allOf: Optional[List[Any]] = None oneOf: Optional[List[Any]] = None anyOf: Optional[List[Any]] = None not_: Optional[List[Any]] = PSchema(None, alias="not") # type: ignore items: Optional[Any] = None properties: Optional[Dict[str, Any]] = None additionalProperties: Optional[Union[Dict[str, Any], bool]] = None description: Optional[str] = None format: Optional[str] = None default: Optional[Any] = None nullable: Optional[bool] = None discriminator: Optional[Discriminator] = None readOnly: Optional[bool] = None writeOnly: Optional[bool] = None xml: Optional[XML] = None externalDocs: Optional[ExternalDocumentation] = None example: Optional[Any] = None deprecated: Optional[bool] = None class Schema(SchemaBase): allOf: Optional[List[SchemaBase]] = None oneOf: Optional[List[SchemaBase]] = None anyOf: Optional[List[SchemaBase]] = None not_: Optional[List[SchemaBase]] = PSchema(None, alias="not") # type: ignore items: Optional[SchemaBase] = None properties: Optional[Dict[str, SchemaBase]] = None additionalProperties: Optional[Union[SchemaBase, bool]] = None class Example(BaseModel): summary: Optional[str] = None description: Optional[str] = None value: Optional[Any] = None externalValue: Optional[UrlStr] = None class ParameterInType(Enum): query = "query" header = "header" path = "path" cookie = "cookie" class Encoding(BaseModel): contentType: Optional[str] = None # Workaround OpenAPI recursive reference, using Any headers: Optional[Dict[str, Union[Any, Reference]]] = None style: Optional[str] = None explode: Optional[bool] = None allowReserved: Optional[bool] = None class MediaType(BaseModel): schema_: Optional[Union[Schema, Reference]] = PSchema( None, alias="schema" ) # type: ignore example: Optional[Any] = None examples: Optional[Dict[str, Union[Example, Reference]]] = None encoding: Optional[Dict[str, Encoding]] = None class ParameterBase(BaseModel): description: Optional[str] = None required: Optional[bool] = None deprecated: Optional[bool] = None # Serialization rules for simple scenarios style: Optional[str] = None explode: Optional[bool] = None allowReserved: Optional[bool] = None schema_: Optional[Union[Schema, Reference]] = PSchema( None, alias="schema" ) # type: ignore example: Optional[Any] = None examples: Optional[Dict[str, Union[Example, Reference]]] = None # Serialization rules for more complex scenarios content: Optional[Dict[str, MediaType]] = None class Parameter(ParameterBase): name: str in_: ParameterInType = PSchema(..., alias="in") # type: ignore class Header(ParameterBase): pass # Workaround OpenAPI recursive reference class EncodingWithHeaders(Encoding): headers: Optional[Dict[str, Union[Header, Reference]]] = None class RequestBody(BaseModel): description: Optional[str] = None content: Dict[str, MediaType] required: Optional[bool] = None class Link(BaseModel): operationRef: Optional[str] = None operationId: Optional[str] = None parameters: Optional[Dict[str, Union[Any, str]]] = None requestBody: Optional[Union[Any, str]] = None description: Optional[str] = None server: Optional[Server] = None class Response(BaseModel): description: str headers: Optional[Dict[str, Union[Header, Reference]]] = None content: Optional[Dict[str, MediaType]] = None links: Optional[Dict[str, Union[Link, Reference]]] = None class Responses(BaseModel): default: Response class Operation(BaseModel): tags: Optional[List[str]] = None summary: Optional[str] = None description: Optional[str] = None externalDocs: Optional[ExternalDocumentation] = None operationId: Optional[str] = None parameters: Optional[List[Union[Parameter, Reference]]] = None requestBody: Optional[Union[RequestBody, Reference]] = None responses: Union[Responses, Dict[str, Response]] # Workaround OpenAPI recursive reference callbacks: Optional[Dict[str, Union[Dict[str, Any], Reference]]] = None deprecated: Optional[bool] = None security: Optional[List[Dict[str, List[str]]]] = None servers: Optional[List[Server]] = None class PathItem(BaseModel): ref: Optional[str] = PSchema(None, alias="$ref") # type: ignore summary: Optional[str] = None description: Optional[str] = None get: Optional[Operation] = None put: Optional[Operation] = None post: Optional[Operation] = None delete: Optional[Operation] = None options: Optional[Operation] = None head: Optional[Operation] = None patch: Optional[Operation] = None trace: Optional[Operation] = None servers: Optional[List[Server]] = None parameters: Optional[List[Union[Parameter, Reference]]] = None # Workaround OpenAPI recursive reference class OperationWithCallbacks(BaseModel): callbacks: Optional[Dict[str, Union[Dict[str, PathItem], Reference]]] = None class SecuritySchemeType(Enum): apiKey = "apiKey" http = "http" oauth2 = "oauth2" openIdConnect = "openIdConnect" class SecurityBase(BaseModel): type_: SecuritySchemeType = PSchema(..., alias="type") # type: ignore description: Optional[str] = None class APIKeyIn(Enum): query = "query" header = "header" cookie = "cookie" class APIKey(SecurityBase): type_ = PSchema(SecuritySchemeType.apiKey, alias="type") # type: ignore in_: APIKeyIn = PSchema(..., alias="in") # type: ignore name: str class HTTPBase(SecurityBase): type_ = PSchema(SecuritySchemeType.http, alias="type") # type: ignore scheme: str class HTTPBearer(HTTPBase): scheme = "bearer" bearerFormat: Optional[str] = None class OAuthFlow(BaseModel): refreshUrl: Optional[str] = None scopes: Dict[str, str] = {} class OAuthFlowImplicit(OAuthFlow): authorizationUrl: str class OAuthFlowPassword(OAuthFlow): tokenUrl: str class OAuthFlowClientCredentials(OAuthFlow): tokenUrl: str class OAuthFlowAuthorizationCode(OAuthFlow): authorizationUrl: str tokenUrl: str class OAuthFlows(BaseModel): implicit: Optional[OAuthFlowImplicit] = None password: Optional[OAuthFlowPassword] = None clientCredentials: Optional[OAuthFlowClientCredentials] = None authorizationCode: Optional[OAuthFlowAuthorizationCode] = None class OAuth2(SecurityBase): type_ = PSchema(SecuritySchemeType.oauth2, alias="type") # type: ignore flows: OAuthFlows class OpenIdConnect(SecurityBase): type_ = PSchema(SecuritySchemeType.openIdConnect, alias="type") # type: ignore openIdConnectUrl: str SecurityScheme = Union[APIKey, HTTPBase, OAuth2, OpenIdConnect, HTTPBearer] class Components(BaseModel): schemas: Optional[Dict[str, Union[Schema, Reference]]] = None responses: Optional[Dict[str, Union[Response, Reference]]] = None parameters: Optional[Dict[str, Union[Parameter, Reference]]] = None examples: Optional[Dict[str, Union[Example, Reference]]] = None requestBodies: Optional[Dict[str, Union[RequestBody, Reference]]] = None headers: Optional[Dict[str, Union[Header, Reference]]] = None securitySchemes: Optional[Dict[str, Union[SecurityScheme, Reference]]] = None links: Optional[Dict[str, Union[Link, Reference]]] = None callbacks: Optional[Dict[str, Union[Dict[str, PathItem], Reference]]] = None class Tag(BaseModel): name: str description: Optional[str] = None externalDocs: Optional[ExternalDocumentation] = None class OpenAPI(BaseModel): openapi: str info: Info servers: Optional[List[Server]] = None paths: Dict[str, PathItem] components: Optional[Components] = None security: Optional[List[Dict[str, List[str]]]] = None tags: Optional[List[Tag]] = None externalDocs: Optional[ExternalDocumentation] = None PK踭iN(&&fastapi/openapi/utils.pyfrom typing import Any, Dict, List, Optional, Sequence, Tuple, Type from fastapi import routing from fastapi.dependencies.models import Dependant from fastapi.dependencies.utils import get_flat_dependant from fastapi.encoders import jsonable_encoder from fastapi.openapi.constants import METHODS_WITH_BODY, REF_PREFIX from fastapi.openapi.models import OpenAPI from fastapi.params import Body, Param from fastapi.utils import get_flat_models_from_routes, get_model_definitions from pydantic.fields import Field from pydantic.schema import Schema, field_schema, get_model_name_map from pydantic.utils import lenient_issubclass from starlette.responses import JSONResponse from starlette.routing import BaseRoute from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY validation_error_definition = { "title": "ValidationError", "type": "object", "properties": { "loc": {"title": "Location", "type": "array", "items": {"type": "string"}}, "msg": {"title": "Message", "type": "string"}, "type": {"title": "Error Type", "type": "string"}, }, "required": ["loc", "msg", "type"], } validation_error_response_definition = { "title": "HTTPValidationError", "type": "object", "properties": { "detail": { "title": "Detail", "type": "array", "items": {"$ref": REF_PREFIX + "ValidationError"}, } }, } def get_openapi_params(dependant: Dependant) -> List[Field]: flat_dependant = get_flat_dependant(dependant) return ( flat_dependant.path_params + flat_dependant.query_params + flat_dependant.header_params + flat_dependant.cookie_params ) def get_openapi_security_definitions(flat_dependant: Dependant) -> Tuple[Dict, List]: security_definitions = {} operation_security = [] for security_requirement in flat_dependant.security_requirements: security_definition = jsonable_encoder( security_requirement.security_scheme.model, by_alias=True, include_none=False, ) security_name = security_requirement.security_scheme.scheme_name security_definitions[security_name] = security_definition operation_security.append({security_name: security_requirement.scopes}) return security_definitions, operation_security def get_openapi_operation_parameters( all_route_params: Sequence[Field] ) -> Tuple[Dict[str, Dict], List[Dict[str, Any]]]: definitions: Dict[str, Dict] = {} parameters = [] for param in all_route_params: schema: Param = param.schema if "ValidationError" not in definitions: definitions["ValidationError"] = validation_error_definition definitions["HTTPValidationError"] = validation_error_response_definition parameter = { "name": param.alias, "in": schema.in_.value, "required": param.required, "schema": field_schema(param, model_name_map={})[0], } if schema.description: parameter["description"] = schema.description if schema.deprecated: parameter["deprecated"] = schema.deprecated parameters.append(parameter) return definitions, parameters def get_openapi_operation_request_body( *, body_field: Field, model_name_map: Dict[Type, str] ) -> Optional[Dict]: if not body_field: return None assert isinstance(body_field, Field) body_schema, _ = field_schema( body_field, model_name_map=model_name_map, ref_prefix=REF_PREFIX ) schema: Schema = body_field.schema if isinstance(schema, Body): request_media_type = schema.media_type else: # Includes not declared media types (Schema) request_media_type = "application/json" required = body_field.required request_body_oai: Dict[str, Any] = {} if required: request_body_oai["required"] = required request_body_oai["content"] = {request_media_type: {"schema": body_schema}} return request_body_oai def generate_operation_id(*, route: routing.APIRoute, method: str) -> str: if route.operation_id: return route.operation_id path: str = route.path operation_id = route.name + path operation_id = operation_id.replace("{", "_").replace("}", "_").replace("/", "_") operation_id = operation_id + "_" + method.lower() return operation_id def generate_operation_summary(*, route: routing.APIRoute, method: str) -> str: if route.summary: return route.summary return route.name.replace("_", " ").title() + " " + method.title() def get_openapi_operation_metadata(*, route: routing.APIRoute, method: str) -> Dict: operation: Dict[str, Any] = {} if route.tags: operation["tags"] = route.tags operation["summary"] = generate_operation_summary(route=route, method=method) if route.description: operation["description"] = route.description operation["operationId"] = generate_operation_id(route=route, method=method) if route.deprecated: operation["deprecated"] = route.deprecated return operation def get_openapi_path( *, route: routing.APIRoute, model_name_map: Dict[Type, str] ) -> Tuple[Dict, Dict, Dict]: path = {} security_schemes: Dict[str, Any] = {} definitions: Dict[str, Any] = {} assert route.methods is not None, "Methods must be a list" if route.include_in_schema: for method in route.methods: operation = get_openapi_operation_metadata(route=route, method=method) parameters: List[Dict] = [] flat_dependant = get_flat_dependant(route.dependant) security_definitions, operation_security = get_openapi_security_definitions( flat_dependant=flat_dependant ) if operation_security: operation.setdefault("security", []).extend(operation_security) if security_definitions: security_schemes.update(security_definitions) all_route_params = get_openapi_params(route.dependant) validation_definitions, operation_parameters = get_openapi_operation_parameters( all_route_params=all_route_params ) definitions.update(validation_definitions) parameters.extend(operation_parameters) if parameters: operation["parameters"] = parameters if method in METHODS_WITH_BODY: request_body_oai = get_openapi_operation_request_body( body_field=route.body_field, model_name_map=model_name_map ) if request_body_oai: operation["requestBody"] = request_body_oai if "ValidationError" not in definitions: definitions["ValidationError"] = validation_error_definition definitions[ "HTTPValidationError" ] = validation_error_response_definition status_code = str(route.status_code) response_schema = {"type": "string"} if lenient_issubclass(route.content_type, JSONResponse): if route.response_field: response_schema, _ = field_schema( route.response_field, model_name_map=model_name_map, ref_prefix=REF_PREFIX, ) else: response_schema = {} content = {route.content_type.media_type: {"schema": response_schema}} operation["responses"] = { status_code: { "description": route.response_description, "content": content, } } if all_route_params or route.body_field: operation["responses"][str(HTTP_422_UNPROCESSABLE_ENTITY)] = { "description": "Validation Error", "content": { "application/json": { "schema": {"$ref": REF_PREFIX + "HTTPValidationError"} } }, } path[method.lower()] = operation return path, security_schemes, definitions def get_openapi( *, title: str, version: str, openapi_version: str = "3.0.2", description: str = None, routes: Sequence[BaseRoute], openapi_prefix: str = "" ) -> Dict: info = {"title": title, "version": version} if description: info["description"] = description output = {"openapi": openapi_version, "info": info} components: Dict[str, Dict] = {} paths: Dict[str, Dict] = {} flat_models = get_flat_models_from_routes(routes) model_name_map = get_model_name_map(flat_models) definitions = get_model_definitions( flat_models=flat_models, model_name_map=model_name_map ) for route in routes: if isinstance(route, routing.APIRoute): result = get_openapi_path(route=route, model_name_map=model_name_map) if result: path, security_schemes, path_definitions = result if path: paths.setdefault(openapi_prefix + route.path, {}).update(path) if security_schemes: components.setdefault("securitySchemes", {}).update( security_schemes ) if path_definitions: definitions.update(path_definitions) if definitions: components.setdefault("schemas", {}).update(definitions) if components: output["components"] = components output["paths"] = paths return jsonable_encoder(OpenAPI(**output), by_alias=True, include_none=False) PK}晆N訛鍢99fastapi/security/__init__.pyfrom .api_key import APIKeyCookie, APIKeyHeader, APIKeyQuery from .http import ( HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer, HTTPDigest, ) from .oauth2 import OAuth2, OAuth2PasswordBearer, OAuth2PasswordRequestForm from .open_id_connect_url import OpenIdConnect PK!~僋嘛$+8 8 fastapi/security/api_key.pyfrom typing import Optional from fastapi.openapi.models import APIKey, APIKeyIn from fastapi.security.base import SecurityBase from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.status import HTTP_403_FORBIDDEN class APIKeyBase(SecurityBase): pass class APIKeyQuery(APIKeyBase): def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True): self.model: APIKey = APIKey(**{"in": APIKeyIn.query}, name=name) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[str]: api_key: str = request.query_params.get(self.model.name) if not api_key: if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return api_key class APIKeyHeader(APIKeyBase): def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True): self.model: APIKey = APIKey(**{"in": APIKeyIn.header}, name=name) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[str]: api_key: str = request.headers.get(self.model.name) if not api_key: if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return api_key class APIKeyCookie(APIKeyBase): def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True): self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=name) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[str]: api_key: str = request.cookies.get(self.model.name) if not api_key: if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return api_key PKhYTNr(詬fastapi/security/base.pyfrom fastapi.openapi.models import SecurityBase as SecurityBaseModel class SecurityBase: model: SecurityBaseModel scheme_name: str PK!~僋"RQttfastapi/security/http.pyimport binascii from base64 import b64decode from typing import Optional from fastapi.openapi.models import ( HTTPBase as HTTPBaseModel, HTTPBearer as HTTPBearerModel, ) from fastapi.security.base import SecurityBase from fastapi.security.utils import get_authorization_scheme_param from pydantic import BaseModel from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.status import HTTP_403_FORBIDDEN class HTTPBasicCredentials(BaseModel): username: str password: str class HTTPAuthorizationCredentials(BaseModel): scheme: str credentials: str class HTTPBase(SecurityBase): def __init__( self, *, scheme: str, scheme_name: str = None, auto_error: bool = True ): self.model = HTTPBaseModel(scheme=scheme) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__( self, request: Request ) -> Optional[HTTPAuthorizationCredentials]: authorization: str = request.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) class HTTPBasic(HTTPBase): def __init__( self, *, scheme_name: str = None, realm: str = None, auto_error: bool = True ): self.model = HTTPBaseModel(scheme="basic") self.scheme_name = scheme_name or self.__class__.__name__ self.realm = realm self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]: authorization: str = request.headers.get("Authorization") scheme, param = get_authorization_scheme_param(authorization) # before implementing headers with 401 errors, wait for: https://github.com/encode/starlette/issues/295 # unauthorized_headers = {"WWW-Authenticate": "Basic"} invalid_user_credentials_exc = HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Invalid authentication credentials" ) if not authorization or scheme.lower() != "basic": if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None try: data = b64decode(param).decode("ascii") except (ValueError, UnicodeDecodeError, binascii.Error): raise invalid_user_credentials_exc username, separator, password = data.partition(":") if not (separator): raise invalid_user_credentials_exc return HTTPBasicCredentials(username=username, password=password) class HTTPBearer(HTTPBase): def __init__( self, *, bearerFormat: str = None, scheme_name: str = None, auto_error: bool = True ): self.model = HTTPBearerModel(bearerFormat=bearerFormat) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__( self, request: Request ) -> Optional[HTTPAuthorizationCredentials]: authorization: str = request.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None if scheme.lower() != "bearer": raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Invalid authentication credentials", ) return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) class HTTPDigest(HTTPBase): def __init__(self, *, scheme_name: str = None, auto_error: bool = True): self.model = HTTPBaseModel(scheme="digest") self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__( self, request: Request ) -> Optional[HTTPAuthorizationCredentials]: authorization: str = request.headers.get("Authorization") scheme, credentials = get_authorization_scheme_param(authorization) if not (authorization and scheme and credentials): if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None if scheme.lower() != "digest": raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Invalid authentication credentials", ) return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) PK!~僋.宜"fastapi/security/oauth2.pyfrom typing import Optional from fastapi.openapi.models import OAuth2 as OAuth2Model, OAuthFlows as OAuthFlowsModel from fastapi.params import Form from fastapi.security.base import SecurityBase from fastapi.security.utils import get_authorization_scheme_param from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.status import HTTP_403_FORBIDDEN class OAuth2PasswordRequestForm: """ This is a dependency class, use it like: @app.post("/login") def login(form_data: Oauth2PasswordRequestForm = Depends()): data = form_data.parse() print(data.username) print(data.password) for scope in data.scopes: print(scope) if data.client_id: print(data.client_id) if data.client_secret: print(data.client_secret) return data It creates the following Form request parameters in your endpoint: grant_type: the OAuth2 spec says it is required and MUST be the fixed string "password". Nevertheless, this dependency class is permissive and allows not passing it. If you want to enforce it, use instead the OAuth2PasswordRequestFormStrict dependency. username: username string. The OAuth2 spec requires the exact field name "username". password: password string. The OAuth2 spec requires the exact field name "password". scope: Optional string. Several scopes (each one a string) separated by spaces. E.g. "items:read items:write users:read profile openid" client_id: optional string. OAuth2 recommends sending the client_id and client_secret (if any) using HTTP Basic auth, as: client_id:client_secret client_secret: optional string. OAuth2 recommends sending the client_id and client_secret (if any) using HTTP Basic auth, as: client_id:client_secret """ def __init__( self, grant_type: str = Form(None, regex="password"), username: str = Form(...), password: str = Form(...), scope: str = Form(""), client_id: Optional[str] = Form(None), client_secret: Optional[str] = Form(None), ): self.grant_type = grant_type self.username = username self.password = password self.scopes = scope.split() self.client_id = client_id self.client_secret = client_secret class OAuth2PasswordRequestFormStrict(OAuth2PasswordRequestForm): """ This is a dependency class, use it like: @app.post("/login") def login(form_data: Oauth2PasswordRequestFormStrict = Depends()): data = form_data.parse() print(data.username) print(data.password) for scope in data.scopes: print(scope) if data.client_id: print(data.client_id) if data.client_secret: print(data.client_secret) return data It creates the following Form request parameters in your endpoint: grant_type: the OAuth2 spec says it is required and MUST be the fixed string "password". This dependency is strict about it. If you want to be permissive, use instead the OAuth2PasswordRequestFormStrict dependency class. username: username string. The OAuth2 spec requires the exact field name "username". password: password string. The OAuth2 spec requires the exact field name "password". scope: Optional string. Several scopes (each one a string) separated by spaces. E.g. "items:read items:write users:read profile openid" client_id: optional string. OAuth2 recommends sending the client_id and client_secret (if any) using HTTP Basic auth, as: client_id:client_secret client_secret: optional string. OAuth2 recommends sending the client_id and client_secret (if any) using HTTP Basic auth, as: client_id:client_secret """ def __init__( self, grant_type: str = Form(..., regex="password"), username: str = Form(...), password: str = Form(...), scope: str = Form(""), client_id: Optional[str] = Form(None), client_secret: Optional[str] = Form(None), ): super().__init__( grant_type=grant_type, username=username, password=password, scope=scope, client_id=client_id, client_secret=client_secret, ) class OAuth2(SecurityBase): def __init__( self, *, flows: OAuthFlowsModel = OAuthFlowsModel(), scheme_name: str = None, auto_error: bool = True ): self.model = OAuth2Model(flows=flows) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[str]: authorization: str = request.headers.get("Authorization") if not authorization: if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return authorization class OAuth2PasswordBearer(OAuth2): def __init__( self, tokenUrl: str, scheme_name: str = None, scopes: dict = None, auto_error: bool = True, ): if not scopes: scopes = {} flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error) async def __call__(self, request: Request) -> Optional[str]: authorization: str = request.headers.get("Authorization") scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "bearer": if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return param PK!~僋翃堏'fastapi/security/open_id_connect_url.pyfrom typing import Optional from fastapi.openapi.models import OpenIdConnect as OpenIdConnectModel from fastapi.security.base import SecurityBase from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.status import HTTP_403_FORBIDDEN class OpenIdConnect(SecurityBase): def __init__( self, *, openIdConnectUrl: str, scheme_name: str = None, auto_error: bool = True ): self.model = OpenIdConnectModel(openIdConnectUrl=openIdConnectUrl) self.scheme_name = scheme_name or self.__class__.__name__ self.auto_error = auto_error async def __call__(self, request: Request) -> Optional[str]: authorization: str = request.headers.get("Authorization") if not authorization: if self.auto_error: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" ) else: return None return authorization PKhYTN蘩岖  fastapi/security/utils.pyfrom typing import Tuple def get_authorization_scheme_param(authorization_header_value: str) -> Tuple[str, str]: if not authorization_header_value: return "", "" scheme, _, param = authorization_header_value.partition(" ") return scheme, param PKYqcM恵曪>> fastapi-0.11.0.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2018 Sebasti谩n Ram铆rez Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H卖POfastapi-0.11.0.dist-info/WHEEL 螲M脱 K-*翁铣R03鄏O蚄-J,/睷H松,s彗楄zユd&Y)暒r$)T腈彗リ&鎁rPK!H* >!fastapi-0.11.0.dist-info/METADATA誟閞蹻揀彠╩韟绒>HJ<\姃ej%QCRp84P輂".囤~榼}妝饼2 @歃|L:l$豺2+馞朅擜o残*K帕`遻$騊]跪闓{凖瀢U%IP坯 8}w.鑗YqS5檴\悻H4=!=e&biニ0嬸皭A4 y慐UXb飮,扆<榾卛Y纡p8湪rZa K,螁幗*敥覃饷[锎*q($隉搳 騡/揁艊聧鯒 ]羽蕙黅〣掰9(辫撱莾o肩R噮蕢傀Y枛2-硸騭9 鲒詔騸樰m%M纟|+(论璗Jg媧箹j濝%軤"k轷8埫㎜动嘊#倛?%U薀V還擔(貌5躪V浼 :|貾5p駟統"条倖堛晎P凑{Y竢弜窦儴Iz蹵h亗uN胝偐;蘞藷v蹬2h8u螘8>OCxX$24%鋩$殽庢"eU)鲻䞍(舆罾)g抎<4猊剹枛 ɡ虼b'r 十P)QEBEV鑱栩H吏*出寱嫺/t曠ㄝhT +cy 3qCK p'!>H.lJ釾]Qb99珶@0鋲s舼6晠堊Rc3T"! , 羛b助眾楥穉5抲6e韆(寫@4#]!We暔%飰埅$駼R秉2gbVe掴瘮鳐a9>_遪$虠_*軆>/gQ U>)侶鯀媁泫篐芻 G昜 齄趙{螇覷殥杣UMIzB铍杏亁*鹻忄蒮畔闏:麗蚕7:裔8'鑃覻爴騋8\_鍏 閐應J遜1:幉,!n禨嬲齋欻瞡R:d癍潙Dh衞毯聣峔譓苑爞圃*鞂d&,Ddi m"韘狼pW诡s鈰逐9sネn淉蜨.# 钀:抭裫S9c{麔绝q韹@r+&['UeIp裃x礐@繯#愙榵':j憘) 肤BK7蛏/]葥f!忞匑姗郢榧,锚划攮蜾 攆#H壀敧-e#橆Ac辈排".I洫4珖N翔fR]`T2楜u踌'蹅hmj斁A(<鉊SV4{ 尡0kmUqk57I麶幀熽:淟鮈簀僙窙!扽鎷C%:韂'B栣P罡JM蹛8 9K`湛帋孳b!H=e逃Bs菭埦$$砮鹳(4胉@U{畧",b洋^U簂f飗[u耼峡茾 S鵵皕ˇ詭茝儝┍爞SQ 餣"|^d2 鎢こ疵5畤泲e粖望d媾@5歱VW5o櫼#fh` 夘沼K!I逓H箵吼({.7GkPdォ蚈eLCY&兌U邀N.).* -J慣zTJ蠸x_\袘EjD輵枺R膗( 寫寀晐7Ck50魥 耥V鵱!%宇鯻M毒杄稪謼?`糁S(\襖PW'劗禱垻琫砎$%'0~#'Y6娖,X柏釘瞘,>聦7V毊a摶n>Ey:O蘅?緍鞴鞹1諹叨3g[$i>,ml#寖礫{淋祆鰛鎏h(oDs/ΘS;s 嗋i)&皪[c捚!攖莋絠`\$ c3.`从敹7愕莂飞'患g[3= 芦iVo 7u愪芔躪馀枼 霜 肆0tDV?逿sgY魂滹臣凷m仭涢缻#&火{v綆Q7 2ο擠8s暀聹LE唟載姰輥簇LDr孚拧E⿹ sz飙!瓂种iK拓pU趻R蚇P枀 $嵴姃 !7 9崹蘨p嘬类~轟:膝渵o 囍昤[殝爒k弮Q霥朕囃A声4擇炌4 Q篤sN庨`锛擌日rbe#v#T!q泂~邏2帵}珼dk`=&2GGo 痪豽髗l溭絿貄V賦:m£;R|dbN篵z:-Mnj.来7; 09o穾榔宫2:.柉溸牙D癀<觖|h伴mVw:_,枷i|!I3(奂a;脭蹨逈娐>軼 瓕?躨覙~峪酭飾1kYG0溏Wv>UbX悋擡6鞙\/轘\}YAI\略绞3 pqg+'魥eP歋- }雟 啝蕱淨皝f/!+蔯Vu耇蓊岳瞡棦qh 浌爊!2巘轃捻隴n貔6(働J芸k0糶@}R魵鈍昑蒑,覫9)g 9憻筬J峿坺 6X拾整畭_!町dX獪3[衂$膆&珤靽狠 鋩1 7y晋p嵳苁$< 萯7AtG=Z垰鋄fD З⒏h9=瓦懋耢 .65-d4yL渔\璳>坼)銚惠rt晠窉龇均掹G綜JDZ蝝隟 慊鄋a栭N麟盐..痁儧L{ 7:趟p棆蒿&7J#凰ジ嗕^$琩圚i8Ym4m趎E暒$ c碲阏*C#歩啅1-+ 悃O緔">仉顴酇捎荙=~覐F8z*魁_Mg蛆+z霕A軼耨详阆2?]d_H瀲盓#C 嫿唍礣囊2庛畇夀浕恸頏靦醁3#,4i昔U嬴Ow0?汴挓#\8(U8\氱7 ざ=O3┯D鍭璥珦{zhO绻I┫渡;PC\p`搮Q'%`w硇茁y-I}!o餃tl鼙nkm疷k92=ボqh鈴)-o闆嬚2乭Yu璕se供數-TG橙氓C馐t溙偞t眱搆約气堰&x舂猴搭c=8#>U8.鞩繅 跪笔褿`[岕PK!HE颰炕yfastapi-0.11.0.dist-info/RECORD澱K砪h饈N粨,腯DB軖峳"圼颧35}庮s常J顸^iA 钧>帑o鮵懺_"箔wK麵4}万歪%緐aEy衬6壹'勶萶T卸垈4u酷4燐:妻@T_爵礬VO樚減牣J犜脄諐0v蓶春羬峁 H宖狎a/C;访╇e"_v厛蓊L戗墦nK軩鏏尓XO鷁X h瘟琵娞e羅骱託埭@8哠%恣wH妮牌E蓃xk<穛喵hA焲誧v昤L e鰩礎芊 咝唘/陻\綑%勏7驏m%傒,;纥I5 D(.u8:跴>e鴷導颚鷃\墯ラT*鴥7蝕磲躣蕫淉疪姠鎥X孴U=ekxoG柷瑛u*Q靵b蟑d&u糿$齏#h^0趥戺-孱陇%樺澰彮贽$<繉 !_5責喤N3鎉p;Q蓎骢脽3B8奐;蛸^抬(鰇6YL1M謃ヤqw謌痗J N>珍@翑璧⒅EG%)辏{旺鋙>鼹?纇悦vL彴L轭zn馋⒏`=x'睳苳0\(烤yf"hG"&趈'峩S韐,实襄瞓隩漪ppy┳戁綖;鋧 '/m鎁PGa,w+F瑏G/嘺甦囈嘟)aXE萠僡咱gI湩gp驀鶒鄴xRB6wR已Ag.螚1粡$;0,_C;e11p.X苇!f 騨G絯@6俻 s今薲賽~飆笖:袠bBHv 芮F'矂d衵 熗0钃 WD;l蜤嘿y嘬9:6Ls{靣冒N=圀NT粝聱偎嚒輝醠戳lI4廍d,V旒栄%詼auG蓄灹枫d9a@漩@|沗rlC&碶編菛 lm= 䦟BqD3悂m啊#:UQ(蓔A 鲎I4鴆Wm鹹皙,b饓菕瑐鐦TG暀4D玤f _弭k浝ayh⊿)1-鬖馏{O窫[嗑 篓廊娋!o1詉骫S8A?ぞ"*.櫬篚矤*gz]皦陮EB}1耤,毪`W珃%-綸_傽Q62)u┖#xi2:4(堔= 顺鱼>怬n铱n> '1蓵4晨"(!胷夠苖椳睨@w2y7PKj~僋񻜸fastapi/__init__.pyPK}晆N材蜫U7U7fastapi/applications.pyPK暕pN件銆F9fastapi/datastructures.pyPKWNO悩 p;fastapi/encoders.pyPKhYTN患'魾BEHfastapi/exceptions.pyPKhYTNVP扖篒fastapi/params.pyPKWN 焻H匟{ffastapi/routing.pyPK踭iNlcs闌@0fastapi/utils.pyPKhYTN 灦fastapi/dependencies/__init__.pyPKWND浙A芏fastapi/dependencies/models.pyPKWNb邀5=5=fastapi/dependencies/utils.pyPKhYTNfastapi/openapi/__init__.pyPKhYTN弿maaOfastapi/openapi/constants.pyPK XNoJ<犏fastapi/openapi/docs.pyPKWN$iQ))fastapi/openapi/models.pyPK踭iN(&&,fastapi/openapi/utils.pyPK}晆N訛鍢99wSfastapi/security/__init__.pyPK!~僋嘛$+8 8 闠fastapi/security/api_key.pyPKhYTNr(詬[^fastapi/security/base.pyPK!~僋"RQtt_fastapi/security/http.pyPK!~僋.宜"萻fastapi/security/oauth2.pyPK!~僋翃堏'fastapi/security/open_id_connect_url.pyPKhYTN蘩岖  Hfastapi/security/utils.pyPKYqcM恵曪>> 墤fastapi-0.11.0.dist-info/LICENSEPK!H卖POfastapi-0.11.0.dist-info/WHEELPK!H* >!憱fastapi-0.11.0.dist-info/METADATAPK!HE颰炕y柅fastapi-0.11.0.dist-info/RECORDPK幇